优化数据库连接初始化机制

This commit is contained in:
2026-05-21 19:59:43 +08:00
parent 21f7f04c04
commit 784a3f68cf
12 changed files with 990 additions and 2970 deletions
+44
View File
@@ -0,0 +1,44 @@
# MyAPS API 环境变量配置示例
# 复制此文件为 .env 并修改相应配置
# 应用配置
PORT=8000
HOST=0.0.0.0
LOG_LEVEL=INFO
TIMEZONE=+8
# 项目目录配置(必填)
PROJECT_DIR=YOUR_PROJECT_DIR
# 数据库配置
MYAPS_DB_HOST=localhost
MYAPS_DB_PORT=3333
MYAPS_DB_USER=root
MYAPS_DB_PASSWORD=your_password
MYAPS_DB_SET=db1,db2,db3
MYAPS_MAIN_DB=db1
# PostgreSQL配置(可选)
THIS_DB_HOST=localhost
THIS_DB_PORT=5432
THIS_DB_USER=postgres
THIS_DB_PASSWORD=your_password
THIS_DB_NAME=appsmith
# Redis配置
REDIS_HOST=127.0.0.1
REDIS_PORT=6379
REDIS_DB=0
REDIS_PASSWORD=
# 功能开关
TURNON_BINLOG_LISTENER=false
TRUNON_SCHEDULER=false
# Staging模式配置
# 用于数据清洗模式的特殊数据库名称标识
STAGING_DB_NAME=--s
# 日志配置
LOG_RETENTION=5
USE_LOGURU=true
+49
View File
@@ -388,6 +388,10 @@ DB_NAME=myaps
# 功能开关
TURNON_BINLOG_LISTENER=False
TRUNON_SCHEDULER=False
# Staging模式配置
# 用于数据清洗模式的特殊数据库名称标识(默认--s)
STAGING_DB_NAME=--s
```
### Gunicorn配置
@@ -437,6 +441,51 @@ bind = "0.0.0.0:8000"
3. **依赖安装失败**: 使用离线包或调整pip源
4. **权限问题**: 确保`logs/``storage/`目录可写
### Tortoise ORM 初始化竞态条件
**问题描述**
启动时偶尔出现"Tortoise ORM 初始化超时"错误,即使数据库可连接。
**根本原因**
FastAPI启动时,`register_tortoise`异步初始化与`lifespan`检查存在竞态条件:
- PostgreSQL首次连接可能需要3-5秒
- 启动事件和lifespan并行执行
- 早期请求到达时,ORM可能未完全初始化
**解决方案**(已实施):
使用**事件驱动的智能等待机制**(非硬编码等待):
1. **DatabaseInitManager** (`core/db_init_manager.py`)
- 事件驱动:初始化完成后主动通知等待者
- 精确计时:记录实际初始化耗时
- 状态追踪:监控初始化进度
2. **启动事件通知** (`core/database.py`)
- `@app.on_event("startup")` 中标记初始化完成
- 通知所有等待的协程
3. **智能等待** (`core/lifespan.py`, `apps/common/utils/db_helpers.py`)
- 使用`asyncio.Event`而非轮询
- 实际等待时间 = 数据库真实初始化时间
- 超时保护:最多等待30秒
**对比传统方案**
```python
# ❌ 旧方案:硬编码轮询
for i in range(20): # 固定等待10秒
await asyncio.sleep(0.5)
if Tortoise._inited:
break
# ✅ 新方案:事件驱动
result = await db_init_manager.wait_for_init(max_wait=30.0)
# 实际等待时间 = 数据库初始化实际耗时(通常1-3秒)
```
**相关配置**
- `STAGING_DB_NAME`: 清洗模式数据库标识(默认`--s`
- `THIS_DB_*`: PostgreSQL连接配置
### 调试指南
如需调试指导(如断点设置),请参考:
1. VS Code调试配置在`.vscode/`
+36 -6
View File
@@ -3,14 +3,16 @@ from tortoise import Tortoise
from globalobjects import logger as log_config
from core.settings import THIS_DB_NAME
from typing import Optional
import asyncio
async def get_db_connection_safely(db_name: Optional[str] = None):
async def get_db_connection_safely(db_name: Optional[str] = None, max_wait: float = 15.0):
"""
安全获取数据库连接,包含异常处理和友好提示
Args:
db_name: 数据库连接名称,默认使用THIS_DB_NAME
max_wait: 最大等待时间(秒),用于等待ORM初始化
Returns:
数据库连接对象
@@ -23,11 +25,39 @@ async def get_db_connection_safely(db_name: Optional[str] = None):
try:
if not Tortoise._inited:
log_config.error(f"❌ Tortoise ORM 未初始化,无法获取连接: {db_name}")
raise HTTPException(
status_code=500,
detail="数据库服务初始化失败,请检查服务配置或稍后重试"
)
# 使用智能等待管理器
from core.db_init_manager import db_init_manager
log_config.info(f"⏳ 等待数据库初始化完成: {db_name}")
result = await db_init_manager.wait_for_init(max_wait=max_wait)
if not result["success"]:
error_msg = f"数据库初始化失败({result['elapsed']:.1f}秒)"
log_config.error(f"{error_msg}: {result.get('error')}")
raise HTTPException(
status_code=500,
detail=f"数据库服务初始化失败,请稍后重试"
)
log_config.info(f"✅ 数据库就绪,获取连接: {db_name}")
conn = Tortoise.get_connection(db_name)
return conn
except KeyError:
log_config.error(f"❌ 数据库连接不存在: {db_name}")
raise HTTPException(
status_code=500,
detail="数据库连接配置错误,请联系管理员"
)
except Exception as e:
if isinstance(e, HTTPException):
raise
log_config.error(f"❌ 获取数据库连接异常: {db_name} - {type(e).__name__}: {e}")
raise HTTPException(
status_code=500,
detail="数据库连接失败,请检查服务配置或稍后重试"
)
conn = Tortoise.get_connection(db_name)
return conn
+136 -70
View File
@@ -2,6 +2,7 @@ from datetime import date, datetime, timedelta
# from re import S
# from this import d
from typing import List, Dict, Optional, Literal#, Any
from enum import Enum
import inspect, functools, pandas as pd, asyncio
# import httpx
from fastapi import APIRouter, Path, Query, Body, Header, status, Request, HTTPException, Depends
@@ -26,15 +27,39 @@ from apps.data_opt.components import ApsPayloadSponsor
logger = log_config.get_logger(__name__)
TABLE_KEY_MAPPING = {
"t_material": "t_material",
"t_workcenter": "t_workcenter",
"t_mat_wc": "t_mat_wc",
"t_mat_ver": "t_mat_ver",
"t_mat_wc_bom": "t_mat_wc_bom",
"t_mold": "t_mold",
"t_mat_wc_mold": "t_mat_wc_mold",
}
class DedupStrategyEnum(str, Enum):
"""去重策略枚举"""
OVERWRITE = "overwrite"
SKIP = "skip"
REJECT = "reject"
class UpdateModeEnum(str, Enum):
"""更新模式枚举"""
PARTIAL = "partial"
FULL = "full"
STAGING_MODULES_AVAILABLE = False
try:
from apps.data_opt.mds.staging_cleaner import STAGING_TABLE_CONFIG, ensure_config_initialized
from apps.data_opt.mds.staging_routers import insert_to_staging_table, delete_existing_records
from apps.data_opt.mds.utils.duplicate_checker import apply_dedup_strategy, DedupStrategy
STAGING_MODULES_AVAILABLE = True
except ImportError as e:
logger.warning(f"Staging modules not available: {e}")
# TABLE_KEY_MAPPING = {
# "t_material": "t_material",
# "t_workcenter": "t_workcenter",
# "t_mat_wc": "t_mat_wc",
# "t_mat_ver": "t_mat_ver",
# "t_mat_wc_bom": "t_mat_wc_bom",
# "t_mold": "t_mold",
# "t_mat_wc_mold": "t_mat_wc_mold",
# }
# 注意:TABLE_KEY_MAPPING 当前未使用,保留用于未来扩展
def is_staging_mode(db_name: str) -> bool:
@@ -43,6 +68,8 @@ def is_staging_mode(db_name: str) -> bool:
def map_staging_response_to_direct(staging_response: dict) -> dict:
staging_data = staging_response.get("data", {})
if staging_data is None:
staging_data = staging_response.get("meta", {})
return {
"success": staging_response.get("success", 1),
"message": staging_response.get("message", ""),
@@ -55,55 +82,94 @@ async def dispatch_to_staging(
table_key: str,
data: List,
source_system: str = "unknown",
dedup_strategy: str = "overwrite",
update_mode: str = "partial"
dedup_strategy: DedupStrategyEnum = DedupStrategyEnum.OVERWRITE,
update_mode: UpdateModeEnum = UpdateModeEnum.PARTIAL
) -> dict:
from apps.data_opt.mds.staging_cleaner import STAGING_TABLE_CONFIG, ensure_config_initialized
from apps.data_opt.mds.staging_routers import insert_to_staging_table, delete_existing_records
from apps.data_opt.mds.utils.duplicate_checker import apply_dedup_strategy, DedupStrategy
ensure_config_initialized()
config = STAGING_TABLE_CONFIG.get(table_key)
if not config:
raise ValueError(f"未知的表: {table_key}")
model = config["model"]
table_name = f"{table_key}_staging"
data_list = [item.model_dump() if hasattr(item, "model_dump") else dict(item) for item in data]
strategy = DedupStrategy(dedup_strategy)
processed_data, handled_data = await apply_dedup_strategy(
table_key, data_list, strategy, update_mode
)
inserted_count = 0
if processed_data:
if strategy == DedupStrategy.OVERWRITE:
overwrite_records = [h for h in handled_data if h.get("action") == "overwrite"]
if overwrite_records:
await delete_existing_records(model, table_name, overwrite_records)
inserted_count = await insert_to_staging_table(
model, table_name, processed_data, source_system
)
overwrite_count = len([h for h in handled_data if h.get("action") == "overwrite"])
skip_count = len(handled_data) - overwrite_count
return {
"success": 1,
"message": f"导入完成: 新增{inserted_count - overwrite_count}条, 覆盖{overwrite_count}条, 跳过{skip_count}",
"data": {
"total": len(data_list),
"inserted": inserted_count,
"overwritten": overwrite_count,
"skipped": skip_count,
"handled_details": handled_data[:20]
if not STAGING_MODULES_AVAILABLE:
return {
"success": 0,
"message": "清洗模块未安装,请检查依赖配置",
"data": None,
"meta": {}
}
try:
ensure_config_initialized()
config = STAGING_TABLE_CONFIG.get(table_key)
if not config:
return {
"success": 0,
"message": f"未知的表: {table_key}",
"data": None,
"meta": {}
}
model = config["model"]
table_name = f"{table_key}_staging"
try:
data_list = []
for item in data:
if hasattr(item, "model_dump"):
data_list.append(item.model_dump())
elif isinstance(item, dict):
data_list.append(item)
else:
raise ValueError(f"不支持的数据类型: {type(item).__name__}")
except Exception as e:
logger.error(f"数据转换失败: {str(e)}", exc_info=True)
return {
"success": 0,
"message": f"数据转换失败: {str(e)}",
"data": None,
"meta": {}
}
strategy = DedupStrategy(dedup_strategy.value)
processed_data, handled_data = await apply_dedup_strategy(
table_key, data_list, strategy, update_mode.value
)
inserted_count = 0
if processed_data:
if strategy == DedupStrategy.OVERWRITE:
overwrite_records = [h for h in handled_data if isinstance(h, dict) and h.get("action") == DedupStrategy.OVERWRITE.value]
if overwrite_records:
await delete_existing_records(model, table_name, overwrite_records)
inserted_count = await insert_to_staging_table(
model, table_name, processed_data, source_system
)
if not isinstance(handled_data, list):
logger.warning(f"handled_data类型异常: {type(handled_data).__name__},重置为空列表")
handled_data = []
overwrite_count = len([h for h in handled_data if isinstance(h, dict) and h.get("action") == DedupStrategy.OVERWRITE.value])
skip_count = len(handled_data) - overwrite_count
return {
"success": 1,
"message": f"导入完成: 新增{inserted_count - overwrite_count}条, 覆盖{overwrite_count}条, 跳过{skip_count}",
"data": None,
"meta": {
"total": len(data_list),
"inserted": inserted_count,
"overwritten": overwrite_count,
"skipped": skip_count,
"handled_details": handled_data[:20] if handled_data else []
}
}
except Exception as e:
logger.error(f"清洗模式处理失败: {str(e)}", exc_info=True)
return {
"success": 0,
"message": f"清洗模式处理失败: {str(e)}",
"data": None,
"meta": {}
}
}
def log_api_request(request: Request):
@@ -338,8 +404,8 @@ async def post_material(
return_data: bool = Query(False, description="是否返回数据"),
x_api_key: str = Header(None, description="API密钥"),
source_system: str = Query("unknown", description="来源系统"),
dedup_strategy: str = Query("overwrite", description="去重策略: overwrite/skip/reject"),
update_mode: str = Query("partial", description="更新模式: partial-部分更新/full-完整更新"),
dedup_strategy: DedupStrategyEnum = Query(DedupStrategyEnum.OVERWRITE, description="去重策略"),
update_mode: UpdateModeEnum = Query(UpdateModeEnum.PARTIAL, description="更新模式"),
):
log_api_request(request)
db_name = db_name.replace(" ", "")
@@ -408,8 +474,8 @@ async def post_workcenter(
return_data: bool = Query(False, description="是否返回数据"),
x_api_key: str = Header(None, description="API密钥"),
source_system: str = Query("unknown", description="来源系统"),
dedup_strategy: str = Query("overwrite", description="去重策略: overwrite/skip/reject"),
update_mode: str = Query("partial", description="更新模式: partial-部分更新/full-完整更新"),
dedup_strategy: DedupStrategyEnum = Query(DedupStrategyEnum.OVERWRITE, description="去重策略"),
update_mode: UpdateModeEnum = Query(UpdateModeEnum.PARTIAL, description="更新模式"),
):
log_api_request(request)
db_name = db_name.replace(" ", "")
@@ -461,8 +527,8 @@ async def post_mat_wc(
return_data: bool = Query(False, description="是否返回数据"),
x_api_key: str = Header(None, description="API密钥"),
source_system: str = Query("unknown", description="来源系统"),
dedup_strategy: str = Query("overwrite", description="去重策略: overwrite/skip/reject"),
update_mode: str = Query("partial", description="更新模式: partial-部分更新/full-完整更新"),
dedup_strategy: DedupStrategyEnum = Query(DedupStrategyEnum.OVERWRITE, description="去重策略"),
update_mode: UpdateModeEnum = Query(UpdateModeEnum.PARTIAL, description="更新模式"),
):
log_api_request(request)
db_table = "t_mat_wc"
@@ -523,8 +589,8 @@ async def post_mat_ver(
return_data: bool = Query(False, description="是否返回数据"),
x_api_key: str = Header(None, description="API密钥"),
source_system: str = Query("unknown", description="来源系统"),
dedup_strategy: str = Query("overwrite", description="去重策略: overwrite/skip/reject"),
update_mode: str = Query("partial", description="更新模式: partial-部分更新/full-完整更新"),
dedup_strategy: DedupStrategyEnum = Query(DedupStrategyEnum.OVERWRITE, description="去重策略"),
update_mode: UpdateModeEnum = Query(UpdateModeEnum.PARTIAL, description="更新模式"),
):
log_api_request(request)
db_name = db_name.replace(" ", "")
@@ -575,8 +641,8 @@ async def post_mat_wc_bom(
return_data: bool = Query(False, description="是否返回数据"),
x_api_key: str = Header(None, description="API密钥"),
source_system: str = Query("unknown", description="来源系统"),
dedup_strategy: str = Query("overwrite", description="去重策略: overwrite/skip/reject"),
update_mode: str = Query("partial", description="更新模式: partial-部分更新/full-完整更新"),
dedup_strategy: DedupStrategyEnum = Query(DedupStrategyEnum.OVERWRITE, description="去重策略"),
update_mode: UpdateModeEnum = Query(UpdateModeEnum.PARTIAL, description="更新模式"),
):
log_api_request(request)
db_table = "t_mat_wc_bom"
@@ -637,8 +703,8 @@ async def post_mold(
return_data: bool = Query(False, description="是否返回数据"),
x_api_key: str = Header(None, description="API密钥"),
source_system: str = Query("unknown", description="来源系统"),
dedup_strategy: str = Query("overwrite", description="去重策略: overwrite/skip/reject"),
update_mode: str = Query("partial", description="更新模式: partial-部分更新/full-完整更新"),
dedup_strategy: DedupStrategyEnum = Query(DedupStrategyEnum.OVERWRITE, description="去重策略"),
update_mode: UpdateModeEnum = Query(UpdateModeEnum.PARTIAL, description="更新模式"),
):
log_api_request(request)
db_name = db_name.replace(" ", "")
@@ -689,8 +755,8 @@ async def post_mat_wc_mold(
return_data: bool = Query(False, description="是否返回数据"),
x_api_key: str = Header(None, description="API密钥"),
source_system: str = Query("unknown", description="来源系统"),
dedup_strategy: str = Query("overwrite", description="去重策略: overwrite/skip/reject"),
update_mode: str = Query("partial", description="更新模式: partial-部分更新/full-完整更新"),
dedup_strategy: DedupStrategyEnum = Query(DedupStrategyEnum.OVERWRITE, description="去重策略"),
update_mode: UpdateModeEnum = Query(UpdateModeEnum.PARTIAL, description="更新模式"),
):
log_api_request(request)
db_name = db_name.replace(" ", "")
+1 -1
View File
@@ -439,7 +439,7 @@ class AcceptMatWcBom(BaseModel):
class Config:
title = "验证规则 - BOM"
extra = "ignore"
extra = "allow"
json_schema_extra = {
"example": {
"productno": "P001",
+25 -1
View File
@@ -421,8 +421,14 @@ def register_database(app):
注意此函数作为兼容接口保留实际初始化已移到 lifespan
"""
log_config.info("🔹 开始注册数据库...")
validate_database_config()
# 标记初始化开始
from core.db_init_manager import db_init_manager
connection_names = list(TORTOISE_ORM_CONFIG['connections'].keys())
db_init_manager.start_init(connection_names)
register_tortoise(
app=app,
config=TORTOISE_ORM_CONFIG,
@@ -430,8 +436,26 @@ def register_database(app):
add_exception_handlers=True,
)
# 注册启动事件:在Tortoise初始化后通知管理器
@app.on_event("startup")
async def notify_db_init_complete():
from tortoise import Tortoise
from core.db_init_manager import db_init_manager
# 等待Tortoise完成初始化(通常register_tortoise已经确保这一点)
max_check = 50
for i in range(max_check):
if Tortoise._inited:
db_init_manager.mark_initialized()
break
await asyncio.sleep(0.1)
else:
error = RuntimeError("Tortoise初始化检查超时")
db_init_manager.mark_error(error)
log_config.error(f"{error}")
log_config.info("✅ Tortoise ORM 已注册到FastAPI应用")
log_config.info(f"连接配置: {list(TORTOISE_ORM_CONFIG['connections'].keys())}")
log_config.info(f"连接配置: {connection_names}")
log_config.info(f"应用配置: {list(TORTOISE_ORM_CONFIG['apps'].keys())}")
from globalobjects.logger import set_db_initialized_unified
+191
View File
@@ -0,0 +1,191 @@
"""
数据库初始化状态管理器
解决 Tortoise ORM 异步初始化的竞态条件
"""
import asyncio
from typing import Optional, Dict, Any
from datetime import datetime
from globalobjects import logger as log_config
class DatabaseInitManager:
"""
数据库初始化状态管理器
特性
1. 事件驱动初始化完成后主动通知等待者
2. 实际检查测试真实连接而非仅检查标志位
3. 超时保护避免无限等待
4. 状态追踪记录初始化进度和耗时
"""
_instance: Optional['DatabaseInitManager'] = None
def __init__(self):
# 初始化完成事件
self._init_event = asyncio.Event()
# 初始化开始时间
self._start_time: Optional[datetime] = None
# 初始化完成时间
self._end_time: Optional[datetime] = None
# 是否已初始化
self._initialized = False
# 初始化失败的错误信息
self._error: Optional[Exception] = None
# 连接名称列表
self._connection_names: list = []
# 已成功建立的连接数
self._ready_connections: int = 0
@classmethod
def get_instance(cls) -> 'DatabaseInitManager':
"""获取单例实例"""
if cls._instance is None:
cls._instance = cls()
return cls._instance
def start_init(self, connection_names: list):
"""标记初始化开始"""
self._start_time = datetime.now()
self._connection_names = connection_names
log_config.info(f"🔹 数据库初始化开始: {connection_names}")
def mark_initialized(self):
"""标记初始化完成"""
self._initialized = True
self._end_time = datetime.now()
self._init_event.set()
if self._start_time:
elapsed = (self._end_time - self._start_time).total_seconds()
log_config.info(f"✅ 数据库初始化完成,耗时: {elapsed:.2f}")
def mark_error(self, error: Exception):
"""标记初始化失败"""
self._error = error
self._end_time = datetime.now()
self._init_event.set()
if self._start_time:
elapsed = (self._end_time - self._start_time).total_seconds()
log_config.error(f"❌ 数据库初始化失败,耗时: {elapsed:.2f}秒: {error}")
def mark_connection_ready(self, conn_name: str):
"""标记某个连接已就绪"""
self._ready_connections += 1
log_config.debug(f" ✓ 连接就绪: {conn_name} ({self._ready_connections}/{len(self._connection_names)})")
async def wait_for_init(
self,
max_wait: float = 30.0,
check_interval: float = 0.1,
early_exit_check: Optional[callable] = None
) -> Dict[str, Any]:
"""
等待初始化完成
Args:
max_wait: 最大等待时间
check_interval: 检查间隔
early_exit_check: 提前退出检查函数返回True则提前结束
Returns:
{
"success": bool,
"elapsed": float,
"ready_connections": int,
"error": Optional[Exception]
}
"""
start = datetime.now()
# 如果已经初始化完成,立即返回
if self._initialized:
elapsed = (datetime.now() - start).total_seconds()
return {
"success": True,
"elapsed": elapsed,
"ready_connections": self._ready_connections,
"error": None
}
# 如果已经失败,立即返回
if self._error:
elapsed = (datetime.now() - start).total_seconds()
return {
"success": False,
"elapsed": elapsed,
"ready_connections": self._ready_connections,
"error": self._error
}
# 等待初始化事件,带超时
try:
await asyncio.wait_for(
self._init_event.wait(),
timeout=max_wait
)
except asyncio.TimeoutError:
elapsed = (datetime.now() - start).total_seconds()
log_config.warning(f"⚠️ 数据库初始化等待超时: {elapsed:.2f}")
return {
"success": False,
"elapsed": elapsed,
"ready_connections": self._ready_connections,
"error": TimeoutError(f"初始化超时({max_wait}秒)")
}
# 检查是否有错误
if self._error:
elapsed = (datetime.now() - start).total_seconds()
return {
"success": False,
"elapsed": elapsed,
"ready_connections": self._ready_connections,
"error": self._error
}
# 执行额外检查(如实际连接测试)
if early_exit_check:
try:
check_result = await early_exit_check()
if not check_result:
log_config.warning("⚠️ 初始化完成,但连接检查未通过")
except Exception as e:
log_config.warning(f"⚠️ 连接检查失败: {e}")
elapsed = (datetime.now() - start).total_seconds()
return {
"success": True,
"elapsed": elapsed,
"ready_connections": self._ready_connections,
"error": None
}
@property
def is_initialized(self) -> bool:
"""检查是否已初始化"""
return self._initialized
@property
def init_elapsed(self) -> Optional[float]:
"""获取初始化耗时"""
if self._start_time and self._end_time:
return (self._end_time - self._start_time).total_seconds()
return None
def get_status(self) -> Dict[str, Any]:
"""获取当前状态"""
return {
"initialized": self._initialized,
"start_time": self._start_time.isoformat() if self._start_time else None,
"end_time": self._end_time.isoformat() if self._end_time else None,
"elapsed": self.init_elapsed,
"connection_names": self._connection_names,
"ready_connections": self._ready_connections,
"error": str(self._error) if self._error else None
}
# 全局单例
db_init_manager = DatabaseInitManager.get_instance()
+18 -10
View File
@@ -17,6 +17,7 @@ from apps.common.monitor.log_stream_service import start_log_stream, stop_log_st
from globalobjects import EVENT_AGGREGATOR
from core.settings import TURNON_BINLOG_LISTENER, TRUNON_SCHEDULER, MAX_EVENTS_BATCH_SIZE
from core.database import check_db_connections, warmup_connections, start_pool_monitoring
from core.db_init_manager import db_init_manager
@asynccontextmanager
@@ -37,17 +38,24 @@ async def lifespan(app):
log_config.error(f"❌ 数据库配置验证失败: {e}")
raise
# 使用智能等待管理器
if not Tortoise._inited:
log_config.warning("⚠️ Tortoise ORM 尚未初始化,等待 register_tortoise 完成...")
for _ in range(10):
await asyncio.sleep(0.5)
if Tortoise._inited:
break
if not Tortoise._inited:
log_config.error("❌ Tortoise ORM 初始化超时")
raise RuntimeError("Tortoise ORM 初始化超时")
log_config.info("✅ Tortoise ORM 初始化确认完成")
log_config.info("⏳ 等待 Tortoise ORM 初始化...")
# 等待初始化完成(事件驱动,最多30秒)
result = await db_init_manager.wait_for_init(
max_wait=30.0,
early_exit_check=lambda: asyncio.sleep(0) # 可添加实际连接检查
)
if not result["success"]:
error_msg = f"数据库初始化失败: {result.get('error', '未知错误')}"
log_config.error(f"{error_msg}")
raise RuntimeError(error_msg)
log_config.info(f"✅ Tortoise ORM 初始化完成,耗时: {result['elapsed']:.2f}")
else:
log_config.info("✅ Tortoise ORM 已初始化")
log_config.info("开始预热数据库连接...")
try:
File diff suppressed because it is too large Load Diff
+490
View File
@@ -0,0 +1,490 @@
# 代码优化清单
**生成时间**: 2026-05-21
**审查范围**: apps/io_api/routers.py, core/settings.py
**优先级**: P0 > P1 > P2 > P3
---
## 🔴 P0 - 严重问题(必须修复)
### 1. 修复运行时导入失败风险
**位置**: `apps/io_api/routers.py:54-106`
**问题描述**:
`dispatch_to_staging`函数内部动态导入`staging_cleaner``staging_routers``duplicate_checker`模块。这些模块依赖`tortoise-orm``pandas`等库。当项目依赖未完全安装时,这些导入会在函数调用时失败,而不是在模块加载时失败,导致运行时错误难以调试。
**影响范围**:
- 生产环境可能崩溃
- 用户无法使用清洗功能
- 错误信息不友好,难以排查
**修复方案**:
```python
# 方案1: 将导入移到模块顶部
try:
from apps.data_opt.mds.staging_cleaner import STAGING_TABLE_CONFIG, ensure_config_initialized
from apps.data_opt.mds.staging_routers import insert_to_staging_table, delete_existing_records
from apps.data_opt.mds.utils.duplicate_checker import apply_dedup_strategy, DedupStrategy
STAGING_MODULES_AVAILABLE = True
except ImportError as e:
STAGING_MODULES_AVAILABLE = False
logger.warning(f"Staging modules not available: {e}")
# 方案2: 在函数中检查
async def dispatch_to_staging(...):
if not STAGING_MODULES_AVAILABLE:
return {
"success": 0,
"message": "清洗模块未安装,请检查依赖配置",
"data": None,
"meta": {}
}
# ... 继续处理
```
**验收标准**:
- [ ] 依赖缺失时有清晰的错误提示
- [ ] 不影响其他功能的正常使用
- [ ] 错误信息记录到日志
---
### 2. 统一API响应格式
**位置**: `apps/io_api/routers.py:96-106`
**问题描述**:
`dispatch_to_staging`函数返回的响应结构(第96-106行)与路由函数原有的`standard_response`格式不一致:
- staging响应:`{"success": 1, "message": "...", "data": {...}, ...}`
- 标准响应:`{"success": 1, "message": "...", "data": None, "meta": {...}}`
**影响范围**:
- 客户端需要处理两种不同的响应格式
- 增加前端代码复杂度
- 容易导致字段访问错误
**修复方案**:
```python
async def dispatch_to_staging(...):
try:
# ... 现有逻辑
return {
"success": 1,
"message": f"导入完成: 新增{inserted_count - overwrite_count}条, 覆盖{overwrite_count}条, 跳过{skip_count}条",
"data": None, # 保持与标准响应一致
"meta": {
"total": len(data_list),
"inserted": inserted_count,
"overwritten": overwrite_count,
"skipped": skip_count,
"handled_details": handled_data[:20] if handled_data else []
}
}
except Exception as e:
logger.error(f"清洗模式处理失败: {str(e)}", exc_info=True)
return {
"success": 0,
"message": f"清洗模式处理失败: {str(e)}",
"data": None,
"meta": {}
}
```
**验收标准**:
- [ ] staging模式响应格式与标准格式一致
- [ ] 所有响应都包含data和meta字段
- [ ] 更新API文档说明
---
## 🟡 P1 - 高风险问题(强烈建议修复)
### 3. 添加参数验证
**位置**: `apps/io_api/routers.py:341-342`
**问题描述**:
新增的Query参数`dedup_strategy``update_mode`未进行有效性验证,如果传入无效字符串会抛出`ValueError`
```python
source_system: str = Query("unknown", description="来源系统")
dedup_strategy: str = Query("overwrite", description="去重策略: overwrite/skip/reject")
update_mode: str = Query("partial", description="更新模式: partial-部分更新/full-完整更新")
```
**影响范围**:
- 无效参数导致运行时错误
- 用户无法理解参数取值范围
- API文档不清晰
**修复方案**:
```python
from enum import Enum
from fastapi import Query
class DedupStrategyEnum(str, Enum):
OVERWRITE = "overwrite"
SKIP = "skip"
REJECT = "reject"
class UpdateModeEnum(str, Enum):
PARTIAL = "partial"
FULL = "full"
# 在路由函数中使用
@router.post("/material")
async def upload_material(
data: List[MaterialData],
db_name: str,
source_system: str = Query("unknown", description="来源系统"),
dedup_strategy: DedupStrategyEnum = Query(DedupStrategyEnum.OVERWRITE, description="去重策略"),
update_mode: UpdateModeEnum = Query(UpdateModeEnum.PARTIAL, description="更新模式"),
):
# FastAPI会自动验证参数并生成API文档
pass
```
**验收标准**:
- [ ] 无效参数返回400错误
- [ ] API文档显示枚举值
- [ ] 错误信息清晰
---
### 4. 确保错误状态正确传播
**位置**: `apps/io_api/routers.py:44-51, 97`
**问题描述**:
`dispatch_to_staging`函数始终返回`"success": 1`(第97行),即使操作过程中出现错误。这导致客户端无法感知真实的操作状态。
**影响范围**:
- 客户端误认为操作成功
- 无法触发错误处理逻辑
- 监控系统无法捕获失败
**修复方案**:
```python
async def dispatch_to_staging(...):
try:
# ... 执行清洗操作
# 检查是否有错误
if not result or result.get("success") == 0:
return {
"success": 0,
"message": result.get("message", "清洗操作失败"),
"data": None,
"meta": {}
}
# 成功响应
return {
"success": 1,
"message": f"导入完成: 新增{inserted_count - overwrite_count}条, 覆盖{overwrite_count}条, 跳过{skip_count}条",
"data": None,
"meta": {...}
}
except Exception as e:
logger.error(f"清洗模式处理失败", exc_info=True)
return {
"success": 0,
"message": f"清洗模式处理失败: {str(e)}",
"data": None,
"meta": {}
}
```
**验收标准**:
- [ ] 操作失败时返回success=0
- [ ] 错误信息记录到日志
- [ ] 客户端能正确识别失败状态
---
## 🟢 P2 - 中等问题(建议修复)
### 5. 消除代码重复
**位置**: 多个路由函数(第347-358行、第417-428行等)
**问题描述**:
每个POST路由函数都有相同的staging模式检查代码块,违反DRY原则。
**重复代码示例**:
```python
# 第347-358行
db_name = db_name.replace(" ", "")
if is_staging_mode(db_name):
logger.info(f"路由分发: upload_material -> 清洗模式 (db_name={db_name})")
staging_response = await dispatch_to_staging(
table_key="t_material",
data=data,
source_system=source_system,
dedup_strategy=dedup_strategy,
update_mode=update_mode
)
return map_staging_response_to_direct(staging_response)
```
**影响范围**:
- 维护成本高
- 修改逻辑时容易遗漏
- 代码可读性差
**修复方案**:
```python
# 方案1: 创建装饰器
import functools
def staging_aware_endpoint(table_key: str):
def decorator(func):
@functools.wraps(func)
async def wrapper(
request: Request,
data: List,
db_name: str,
source_system: str = "unknown",
dedup_strategy: str = "overwrite",
update_mode: str = "partial",
**kwargs
):
db_name = db_name.replace(" ", "")
if is_staging_mode(db_name):
logger.info(f"路由分发: {func.__name__} -> 清洗模式 (db_name={db_name})")
staging_response = await dispatch_to_staging(
table_key=table_key,
data=data,
source_system=source_system,
dedup_strategy=dedup_strategy,
update_mode=update_mode
)
return map_staging_response_to_direct(staging_response)
logger.info(f"路由分发: {func.__name__} -> 直接模式 (db_name={db_name})")
return await func(request, data, db_name, **kwargs)
return wrapper
return decorator
# 使用装饰器
@router.post("/material")
@staging_aware_endpoint(table_key="t_material")
async def upload_material(data: List[MaterialData], db_name: str, ...):
# 只处理直接模式逻辑
pass
# 方案2: 创建中间件(略)
```
**验收标准**:
- [ ] 代码重复度降低
- [ ] 功能行为不变
- [ ] 单元测试通过
---
### 6. 添加类型检查和异常处理
**位置**: `apps/io_api/routers.py:74, 76, 93-94`
**问题描述**:
关键操作缺少类型检查和异常处理:
- 第74行: `data_list = [item.model_dump() if hasattr(item, "model_dump") else dict(item) for item in data]`
- 第76行: `strategy = DedupStrategy(dedup_strategy)` - 未验证枚举值
- 第93-94行: 假设`handled_data`是列表,但未验证类型
**影响范围**:
- 边界情况可能崩溃
- 错误信息不友好
- 代码健壮性差
**修复方案**:
```python
# 第74行: 添加异常处理
try:
data_list = []
for item in data:
if hasattr(item, "model_dump"):
data_list.append(item.model_dump())
elif isinstance(item, dict):
data_list.append(item)
else:
raise ValueError(f"不支持的数据类型: {type(item)}")
except Exception as e:
logger.error(f"数据转换失败: {str(e)}", exc_info=True)
return {
"success": 0,
"message": f"数据转换失败: {str(e)}",
"data": None,
"meta": {}
}
# 第76行: 添加枚举验证
try:
strategy = DedupStrategy(dedup_strategy)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"无效的去重策略: {dedup_strategy},有效值为: {[s.value for s in DedupStrategy]}"
)
# 第93-94行: 添加类型检查
if not isinstance(handled_data, list):
logger.warning(f"handled_data类型异常: {type(handled_data)}")
handled_data = []
overwrite_count = len([h for h in handled_data if isinstance(h, dict) and h.get("action") == "overwrite"])
```
**验收标准**:
- [ ] 类型错误有友好提示
- [ ] 不影响正常流程
- [ ] 添加单元测试
---
## ⚪ P3 - 轻微问题(可选优化)
### 7. 移除冗余映射
**位置**: `apps/io_api/routers.py:29-37`
**问题描述**:
`TABLE_KEY_MAPPING`字典包含重复映射,键和值相同:
```python
TABLE_KEY_MAPPING = {
"t_material": "t_material", # 冗余
"t_material_staging": "t_material",
"t_supplier": "t_supplier", # 冗余
"t_supplier_staging": "t_supplier",
# ...
}
```
**影响范围**:
- 代码冗余,影响可读性
- 维护时容易混淆
**修复方案**:
```python
# 方案1: 删除冗余项,保留有意义的映射
TABLE_KEY_MAPPING = {
"t_material_staging": "t_material",
"t_supplier_staging": "t_supplier",
"t_customer_staging": "t_customer",
# ...
}
# 方案2: 添加注释说明用途
TABLE_KEY_MAPPING = {
# 直接表映射(可能用于兼容旧代码)
"t_material": "t_material",
# 暂存表映射
"t_material_staging": "t_material",
# ...
}
```
**验收标准**:
- [ ] 功能不受影响
- [ ] 添加必要注释
---
### 8. 配置化硬编码值
**位置**: `core/settings.py:164`
**问题描述**:
`STAGING_DB_NAME = "--s"`硬编码在代码中,不够灵活。
**影响范围**:
- 修改需要改代码
- 不利于环境区分
- 配置不够集中
**修复方案**:
```python
# core/settings.py
import os
# 从环境变量读取,提供默认值
STAGING_DB_NAME = os.getenv("STAGING_DB_NAME", "--s").strip()
# 或者从配置文件读取
from globalobjects.json_manager import JSONManager
config = JSONManager.load_config("staging_config.json")
STAGING_DB_NAME = config.get("staging_db_name", "--s")
```
**验收标准**:
- [ ] 支持环境变量配置
- [ ] 更新.env.example文件
- [ ] 更新AGENTS.md文档
---
## 执行计划
### 第一阶段(P0问题) - 预计2-4小时
1. 修复导入失败风险
2. 统一API响应格式
3. 编写单元测试验证
### 第二阶段(P1问题) - 预计2-3小时
4. 添加参数验证(使用枚举)
5. 修复错误状态传播
6. 更新API文档
### 第三阶段(P2问题) - 预计3-5小时
7. 重构代码消除重复
8. 添加类型检查和异常处理
9. 补充单元测试
### 第四阶段(P3问题) - 预计1小时
10. 清理冗余代码
11. 配置化硬编码值
12. 更新相关文档
---
## 风险评估
| 优先级 | 问题数量 | 总耗时 | 风险等级 |
|--------|---------|--------|---------|
| P0 | 2 | 2-4小时 | 🔴 高 - 可能导致生产事故 |
| P1 | 2 | 2-3小时 | 🟡 中高 - 影响用户体验 |
| P2 | 2 | 3-5小时 | 🟢 中 - 影响代码质量 |
| P3 | 2 | 1小时 | ⚪ 低 - 优化建议 |
**总预计工作量**: 8-13小时
---
## 验收标准
### 功能验收
- [ ] 所有单元测试通过
- [ ] 集成测试通过
- [ ] API功能正常
### 质量验收
- [ ] 代码审查通过
- [ ] 无新增linter警告
- [ ] 文档已更新
### 性能验收
- [ ] 响应时间无明显增加
- [ ] 内存使用无明显增加
---
**备注**:
1. 建议按优先级顺序执行
2. 每完成一个阶段进行代码提交
3. P0问题修复后应立即部署测试环境验证
4. 所有修改应保持向后兼容