From 5dcf6859d8a1fa1bed5594d99576c514af605b9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B6=85=E5=93=A5?= <2982212683@qq.com> Date: Wed, 20 May 2026 22:26:08 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=95=B0=E6=8D=AE=E5=BA=93?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/common/monitor/routers.py | 86 +++++ apps/common/utils/db_helpers.py | 48 +++ apps/data_opt/components/yonyou_tplus.py | 8 +- apps/data_opt/mds/staging_routers.py | 25 +- apps/io_api/routers.py | 2 +- apps/io_api/schemas.py | 40 +-- core/database.py | 153 ++++++++- core/lifespan.py | 18 +- docs/database_optimization_todo.md | 387 +++++++++++++++++++++++ main.py | 2 +- 10 files changed, 710 insertions(+), 59 deletions(-) create mode 100644 apps/common/utils/db_helpers.py create mode 100644 docs/database_optimization_todo.md diff --git a/apps/common/monitor/routers.py b/apps/common/monitor/routers.py index bfe35c7..f374e26 100644 --- a/apps/common/monitor/routers.py +++ b/apps/common/monitor/routers.py @@ -11,6 +11,7 @@ from datetime import datetime, timedelta, timezone from fastapi import APIRouter, HTTPException, WebSocket from fastapi.responses import JSONResponse, StreamingResponse from typing import Dict, Any, List, Optional +from tortoise import Tortoise from .service import monitor_service from .log_stream_service import log_stream_service from .storage import request_storage, outbound_request_storage, system_log_storage @@ -43,6 +44,91 @@ async def health_check(): return await monitor_service.get_health_status() +@router.get("/health/database") +async def check_database_health() -> Dict[str, Any]: + """ + 检查所有数据库连接状态 + + Returns: + { + "status": "healthy" | "degraded" | "unhealthy", + "connections": {...}, + "tortoise_initialized": bool + } + """ + result = { + "status": "healthy", + "connections": {}, + "tortoise_initialized": Tortoise._inited + } + + if not Tortoise._inited: + result["status"] = "unhealthy" + result["error"] = "Tortoise ORM 未初始化" + return result + + unhealthy_count = 0 + for db_name in Tortoise._connections.keys(): + try: + conn = Tortoise.get_connection(db_name) + start_time = time.time() + + await conn.execute_query("SELECT 1") + + response_time_ms = (time.time() - start_time) * 1000 + + result["connections"][db_name] = { + "status": "healthy", + "response_time_ms": round(response_time_ms, 2), + "error": None + } + + except Exception as e: + unhealthy_count += 1 + result["connections"][db_name] = { + "status": "unhealthy", + "response_time_ms": None, + "error": str(e) + } + + total = len(result["connections"]) + if total == 0: + result["status"] = "unhealthy" + result["error"] = "无可用连接" + elif unhealthy_count == total: + result["status"] = "unhealthy" + elif unhealthy_count > 0: + result["status"] = "degraded" + + return result + + +@router.get("/health/database/{db_name}") +async def check_specific_database(db_name: str) -> Dict[str, Any]: + """检查指定数据库连接状态""" + if not Tortoise._inited: + raise HTTPException(status_code=503, detail="数据库服务初始化中") + + try: + conn = Tortoise.get_connection(db_name) + start_time = time.time() + + await conn.execute_query("SELECT 1") + + response_time_ms = (time.time() - start_time) * 1000 + + return { + "db_name": db_name, + "status": "healthy", + "response_time_ms": round(response_time_ms, 2) + } + + except KeyError: + raise HTTPException(status_code=404, detail=f"连接 '{db_name}' 不存在") + except Exception as e: + raise HTTPException(status_code=500, detail=f"数据库连接失败: {e}") + + @router.get("/resource", response_model=ResourceMetrics) async def get_resource_metrics(): """ diff --git a/apps/common/utils/db_helpers.py b/apps/common/utils/db_helpers.py new file mode 100644 index 0000000..3014598 --- /dev/null +++ b/apps/common/utils/db_helpers.py @@ -0,0 +1,48 @@ +from fastapi import HTTPException +from tortoise import Tortoise +from globalobjects import logger as log_config +from core.settings import THIS_DB_NAME +from typing import Optional + + +async def get_db_connection_safely(db_name: Optional[str] = None): + """ + 安全获取数据库连接,包含异常处理和友好提示 + + Args: + db_name: 数据库连接名称,默认使用THIS_DB_NAME + + Returns: + 数据库连接对象 + + Raises: + HTTPException: 数据库连接失败时返回500错误 + """ + if db_name is None: + db_name = THIS_DB_NAME + + try: + if not Tortoise._inited: + log_config.error(f"❌ Tortoise ORM 未初始化,无法获取连接: {db_name}") + raise HTTPException( + status_code=500, + detail="数据库服务初始化失败,请检查服务配置或稍后重试" + ) + + conn = Tortoise.get_connection(db_name) + return conn + + except KeyError: + log_config.error(f"❌ 数据库连接不存在: {db_name}") + raise HTTPException( + status_code=500, + detail="数据库连接配置错误,请联系管理员" + ) + except Exception as e: + if isinstance(e, HTTPException): + raise + log_config.error(f"❌ 获取数据库连接异常: {db_name} - {type(e).__name__}: {e}") + raise HTTPException( + status_code=500, + detail="数据库连接失败,请检查服务配置或稍后重试" + ) diff --git a/apps/data_opt/components/yonyou_tplus.py b/apps/data_opt/components/yonyou_tplus.py index 7dac0e7..3d4ca13 100644 --- a/apps/data_opt/components/yonyou_tplus.py +++ b/apps/data_opt/components/yonyou_tplus.py @@ -203,7 +203,7 @@ class MoPushModel(PydanticModel): """ 整理推送T+MO数据 """ - ExternalCode: str = Field(None) + ExternalCode: Optional[str] = Field(None) BusiType: dict = Field(None) Department: dict = Field(None) Customer: dict = Field(None) @@ -332,9 +332,9 @@ class PrPushModel(PydanticModel): """ 整理推送T+请购单数据 """ - ExternalCode: str = Field(None) - Code: str = Field(None) - VoucherDate: str = Field(None) + ExternalCode: Optional[str] = Field(None) + Code: Optional[str] = Field(None) + VoucherDate: Optional[str] = Field(None) RequisitionPerson: dict = Field(...) PurchaseRequisitionDetails: list[dict] = Field(...) diff --git a/apps/data_opt/mds/staging_routers.py b/apps/data_opt/mds/staging_routers.py index 4129afb..e7eb367 100644 --- a/apps/data_opt/mds/staging_routers.py +++ b/apps/data_opt/mds/staging_routers.py @@ -18,6 +18,7 @@ from .staging_cleaner import StagingProcessor, DataTransformer, STAGING_TABLE_CO from .config_generator import TABLE_DISPLAY_CONFIG, SYSTEM_RUNTIME_CONFIG from apps.io_api.utils.common import standard_response from apps.io_api.utils.db_operation import db_bupsert +from apps.common.utils.db_helpers import get_db_connection_safely from core.settings import MYAPS_MAIN_DB, THIS_DB_NAME, MYAPS_DBSET_LIST from globalobjects import logger as log_config @@ -161,7 +162,7 @@ async def insert_to_staging_table( if exclude_fields is None: exclude_fields = EXCLUDE_FIELDS - conn = Tortoise.get_connection(THIS_DB_NAME) + conn = await get_db_connection_safely(THIS_DB_NAME) # 获取字段映射:Python字段名(小写) -> 数据库字段名(大驼峰) field_map = {} @@ -319,7 +320,7 @@ async def sync_to_production( if reset_retry: staging_model = STAGING_MODEL_MAPPING.get(table_name) if staging_model: - conn = Tortoise.get_connection(THIS_DB_NAME) + conn = await get_db_connection_safely(THIS_DB_NAME) staging_table_name = staging_model._meta.db_table reset_query = f'UPDATE "{staging_table_name}" SET "_retry_count" = 0 WHERE "_status" = $1' await conn.execute_query(reset_query, ("relation_pass",)) @@ -360,7 +361,7 @@ async def sync_to_production( staging_model = STAGING_MODEL_MAPPING.get(table_name) if staging_model: - conn = Tortoise.get_connection(THIS_DB_NAME) + conn = await get_db_connection_safely(THIS_DB_NAME) staging_table_name = staging_model._meta.db_table synced_time = datetime.now(timezone.utc) @@ -726,7 +727,7 @@ async def get_staging_status( raise ValueError(f"未知的缓冲表: {table_name}") # 使用原生SQL查询,确保与同步查询条件一致 - conn = Tortoise.get_connection(THIS_DB_NAME) + conn = await get_db_connection_safely(THIS_DB_NAME) table_name_staging = staging_model._meta.db_table stats = {} @@ -790,7 +791,7 @@ async def get_monitor_summary(request: Request): from tortoise import Tortoise from core.settings import THIS_DB_NAME - conn = Tortoise.get_connection(THIS_DB_NAME) + conn = await get_db_connection_safely(THIS_DB_NAME) tables = [ "t_material_staging", @@ -858,7 +859,7 @@ async def cleanup_old_data( from datetime import timedelta from core.settings import THIS_DB_NAME - conn = Tortoise.get_connection(THIS_DB_NAME) + conn = await get_db_connection_safely(THIS_DB_NAME) cutoff_date = datetime.now() - timedelta(days=days) @@ -1045,7 +1046,7 @@ async def list_staging( raise ValueError(f"未知的缓冲表: {table_name}") table_name_staging = f"{table_name}_staging" - conn = Tortoise.get_connection(THIS_DB_NAME) + conn = await get_db_connection_safely(THIS_DB_NAME) conditions = [] params = [] @@ -1197,7 +1198,7 @@ async def batch_update_staging( raise ValueError("缺少必要参数: ids或updates") table_name_staging = f"{table_name}_staging" - conn = Tortoise.get_connection(THIS_DB_NAME) + conn = await get_db_connection_safely(THIS_DB_NAME) field_mapping = {} field_types = {} @@ -1270,7 +1271,7 @@ async def get_staging_detail( raise ValueError(f"未知的缓冲表: {table_name}") table_name_staging = f"{table_name}_staging" - conn = Tortoise.get_connection(THIS_DB_NAME) + conn = await get_db_connection_safely(THIS_DB_NAME) query = f'SELECT * FROM "{table_name_staging}" WHERE "_staging_id" = $1' result = await conn.execute_query(query, (staging_id,)) @@ -1313,7 +1314,7 @@ async def update_staging( raise ValueError(f"未知的缓冲表: {table_name}") table_name_staging = f"{table_name}_staging" - conn = Tortoise.get_connection(THIS_DB_NAME) + conn = await get_db_connection_safely(THIS_DB_NAME) field_map = {} field_types = {} @@ -1389,7 +1390,7 @@ async def delete_staging( raise ValueError(f"未知的缓冲表: {table_name}") table_name_staging = f"{table_name}_staging" - conn = Tortoise.get_connection(THIS_DB_NAME) + conn = await get_db_connection_safely(THIS_DB_NAME) query = f'DELETE FROM "{table_name_staging}" WHERE "_staging_id" = $1' await conn.execute_query(query, (staging_id,)) @@ -1418,7 +1419,7 @@ async def batch_delete_staging( raise ValueError("staging_ids不能为空") table_name_staging = f"{table_name}_staging" - conn = Tortoise.get_connection(THIS_DB_NAME) + conn = await get_db_connection_safely(THIS_DB_NAME) placeholders = ", ".join([f"${i+1}" for i in range(len(staging_ids))]) query = f'DELETE FROM "{table_name_staging}" WHERE "_staging_id" IN ({placeholders})' diff --git a/apps/io_api/routers.py b/apps/io_api/routers.py index 7896469..8ca2e29 100644 --- a/apps/io_api/routers.py +++ b/apps/io_api/routers.py @@ -271,7 +271,7 @@ async def post_material( async def run_matver_task(): try: - await post_mat_ver(data=matver_data, db_name=db_name, x_api_key=x_api_key) + await post_mat_ver(request=request, data=matver_data, db_name=db_name, x_api_key=x_api_key) except Exception as e: logger.error(f"Error in post_mat_ver background task: {e}") diff --git a/apps/io_api/schemas.py b/apps/io_api/schemas.py index 87545ba..25774ec 100644 --- a/apps/io_api/schemas.py +++ b/apps/io_api/schemas.py @@ -48,7 +48,7 @@ def _set_raw_input_data(self): class AcceptMaterial(BaseModel): materialno: str = Field(..., description="料号", example="M001") description: str = Field(..., description="物料名称", example="测试物料A") - size: str = Field(None, description="规格", example="100x100mm") + size: Optional[str] = Field(None, description="规格", example="100x100mm") plant: str = Field(pdv.MAT_PLANT, example=pdv.MAT_PLANT, description='工厂') planner: str = Field(pdv.MAT_PLANNER, description="计划员", example="张三") fifo: int = Field(pdv.MAT_FIFO, ge=0, le=1, description='1-FIFO 0-最近原则') @@ -67,19 +67,19 @@ class AcceptMaterial(BaseModel): candelay: gc.YesNoEnum = Field(pdv.MAT_CANDELAY, example="N", description='可否延迟') lotsize: gc.LotSizeEnum = Field(pdv.MAT_LOTSIZE, example="EX", description='批量') lotfix: float = Field(pdv.MAT_LOTFIX, ge=0, description='固定批', example=0.0) - lotmin: float = Field(None, ge=0, description='最小批', example=0.0) - lotmax: float = Field(None, ge=0, description='最大批', example=0.0) + lotmin: float | None = Field(None, ge=0, description='最小批', example=0.0) + lotmax: float | None = Field(None, ge=0, description='最大批', example=0.0) lotround: float = Field(pdv.MAT_LOTROUND, ge=0, description='取整', example=0.0) lotss: float = Field(pdv.MAT_LOTSS, ge=0, description='安全库存', example=0.0) lotpoint: float = Field(pdv.MAT_LOTPOINT, ge=0, description='重订货点', example=0.0) lottop: float = Field(pdv.MAT_LOTTOP, ge=0, description='最大库存点', example=0.0) - planitem: str = Field(None, description='产品组', example="PI001") + planitem: Optional[str] = Field(None, description='产品组', example="PI001") preday: int = Field(pdv.MAT_PREDAY, ge=0, description='向前冲销(天)', example=999) subday: int = Field(pdv.MAT_SUBDAY, ge=0, description='向后冲销(天)', example=999) free1: Optional[str] = Field(None, max_length=255, description='自定义1', example="自定义内容。。。") free2: Optional[str] = Field(None, max_length=255, description='自定义2', example="自定义内容。。。") free3: Optional[str] = Field(None, max_length=255, description='自定义3', example="自定义内容。。。") - memo: str = Field(None, description='备注', example="无特殊要求") + memo: Optional[str] = Field(None, description='备注', example="无特殊要求") _raw_input_data: Dict[str, Any] = PrivateAttr(default=None) class Config: @@ -187,9 +187,9 @@ class AcceptWorkcenter(BaseModel): workcentername: str = Field(..., max_length=255, description="工作中心名称", example="装配车间") pri_wc: int = Field(pdv.WC_PRIORITY, description='优先级', example=1) bottleneck: gc.YesNoEnum = Field(None, example="N", description='瓶颈') - sortno: str = Field(None, max_length=4, description="序号", example="0001") + sortno: Optional[str] = Field(None, max_length=4, description="序号", example="0001") plant: str = Field(pdv.MAT_PLANT, max_length=32, description="工厂", example="1600") - location: str = Field(None, max_length=32, description="车间", example="A区") + location: Optional[str] = Field(None, max_length=32, description="车间", example="A区") finite: gc.YesNoEnum = Field(gc.YesNoEnum.YES, example="N", description='有限') type: gc.YesNoEnum = Field(gc.YesNoEnum.YES, example="N", description="首页显示") capnum: int | None = Field(pdv.WC_CAPNUM, gt=0, description="默认机台数", example=6) @@ -197,7 +197,7 @@ class AcceptWorkcenter(BaseModel): worker: float = Field(pdv.WC_WORKER, ge=0, description='工时', example=8.0) setupno: str | None = Field(None, max_length=6, description='切换组别', example="S001") grpno: str | None = Field(None, max_length=6, description='同组号', example="G001") - memo: str = Field(None, max_length=255, description="备注", example="标准工作中心") + memo: Optional[str] = Field(None, max_length=255, description="备注", example="标准工作中心") _raw_input_data: Dict[str, Any] = PrivateAttr(default=None) class Config: @@ -280,7 +280,7 @@ class AcceptWorkcenter(BaseModel): class AcceptMatWc(BaseModel): materialno: str = Field(..., max_length=64, description='料号', example="M001") matver: str = Field(..., max_length=4, example=pdv.MATVER, description='产线版本') - itemno: str = Field(None, max_length=6, description='工序项目', example=pdv.ITEMNO) + itemno: Optional[str] = Field(None, max_length=6, description='工序项目', example=pdv.ITEMNO) workcenter: str = Field(..., max_length=32, description='工作中心', example="WC001") sortno: int = Field(..., ge=0, le=999, description='序号', example=1) basesec: float = Field(..., ge=0, description='节拍T/T(秒/100)', example=600) @@ -289,7 +289,7 @@ class AcceptMatWc(BaseModel): sf: gc.SfEnum = Field(gc.SfEnum.F, example="F", description='并行S/串行F') offsetsec: int = Field(0, description='偏置+/-(秒)', example=0) rate: float = Field(pdv.MATWC_RATE, ge=0, description='配比', example=pdv.MATWC_RATE) - memo: str = Field(None, max_length=255, description='备注', example="标准工序") + memo: Optional[str] = Field(None, max_length=255, description='备注', example="标准工序") _raw_input_data: Dict[str, Any] = PrivateAttr(default=None) class Config: @@ -321,7 +321,7 @@ class AcceptMatWc(BaseModel): values["sortno"] = int(float(values["sortno"])) except: values["sortno"] = None - if values.get("itemno") in gc.NONE_AND_EMPTY and values["sortno"]: + if values.get("itemno") in gc.NONE_AND_EMPTY and "sortno" in values: values["itemno"] = f"{pdv.itemno_prefix}{values['sortno']:0{pdv.itemno_width}d}" try: values["basesec"] = float(values["basesec"]) @@ -367,9 +367,9 @@ class AcceptMatVer(BaseModel): lotfrom: int = Field(pdv.MATVER_LOTFROM, description='批量起点', example=1) lotto: int = Field(pdv.MATVER_LOTTO, description='批量终点', example=9999999) priority: int = Field(pdv.MATVER_PRIORITY, description='优先级', example=1) - refno: str = Field(None, max_length=64, description='MTO订单号/认证线', example="SO123456") + refno: Optional[str] = Field(None, max_length=64, description='MTO订单号/认证线', example="SO123456") active: gc.YesNoEnum = Field(gc.YesNoEnum.YES, example="Y", description='生效') - memo: str = Field(None, max_length=255, description='备注', example="标准版本") + memo: Optional[str] = Field(None, max_length=255, description='备注', example="标准版本") _raw_input_data: Dict[str, Any] = PrivateAttr(default=None) class Config: @@ -433,7 +433,7 @@ class AcceptMatWcBom(BaseModel): mto: gc.YesNoEnum = Field(gc.YesNoEnum.NO, example="N", description='MTO') scrap: float = Field(0, ge=0, description='报废率%', example=0.0) alt: gc.YesNoEnum = Field(gc.YesNoEnum.NO, example="N", description='是否是替代') - memo: str = Field(None, max_length=255, description='备注', example="标准BOM组件") + memo: Optional[str] = Field(None, max_length=255, description='备注', example="标准BOM组件") denominator: Optional[float | str] = Field(None, description='用量分母', example=1) _raw_input_data: Dict[str, Any] = PrivateAttr(default=None) @@ -500,7 +500,7 @@ class AcceptMold(BaseModel): status: str = Field(..., max_length=6, description='状态', example="AVL") moldnum: int = Field(..., ge=1, description='模具穴数', example=4) qty: int = Field(..., gt=1, description='模具台数', example=2) - memo: str = Field(None, max_length=255, description="备注", example="标准模具") + memo: Optional[str] = Field(None, max_length=255, description="备注", example="标准模具") _raw_input_data: Dict[str, Any] = PrivateAttr(default=None) class Config: @@ -549,7 +549,7 @@ class AcceptMatWcMold(BaseModel): basesec: float = Field(..., ge=0, description='节拍T/T(秒/100)', example=600) fixsec: int = Field(0, ge=0, description='额定时间(秒)', example=300) priority: int = Field(..., description='优先级', example=1) - memo: str = Field(None, max_length=255, description='备注', example="标准机台模具配置") + memo: Optional[str] = Field(None, max_length=255, description='备注', example="标准机台模具配置") _raw_input_data: Dict[str, Any] = PrivateAttr(default=None) class Config: @@ -602,7 +602,7 @@ class AcceptSupply(BaseModel): materialno: str = Field(..., max_length=64, description='料号', example="M001") supplyno: str = Field(..., max_length=64, description='供应单号', example="MO123456") matver: Optional[str] = Field(None, max_length=32, example=pdv.MATVER, description='产线版本') - itemno: str = Field(None, max_length=6, description='项目号', example=pdv.ITEMNO) + itemno: Optional[str] = Field(None, max_length=6, description='项目号', example=pdv.ITEMNO) type: gc.SupplyTypeEnum = Field(..., example="MO", description='类型 PL-生产计划 MO-生产工单 ST-库存 PO-采购订单') category: gc.ProductCategoryEnum = Field(gc.ProductCategoryEnum.MTS, example="MTS", description='分类(MTO/MTS)') priority: int = Field(0, description='优先级', example=0) @@ -700,7 +700,7 @@ class AcceptSupply(BaseModel): class ModifySupply(BaseModel): - supplyno: str = Field(None, max_length=64, description='供应号', example="MO123456") + supplyno: Optional[str] = Field(None, max_length=64, description='供应号', example="MO123456") status: gc.OrderStatusEnum = Field(None, example="CRE", description=f'状态 {list(gc.OrderStatusEnum.__members__.keys())}') avail_qty: Optional[float] = Field(None, ge=0, description='可用数量', example=100.0) @@ -754,7 +754,7 @@ class AcceptDemand(BaseModel): type: gc.DemandTypeEnum = Field(..., example="SO", description='类型 SO-销售订单 DM-计划需求 RS-工单预留 FC-预测 SS-安全库存') category: gc.ProductCategoryEnum = Field(gc.ProductCategoryEnum.MTS, example="MTS", description='分类(MTO/MTS)') priority: int = Field(..., description='优先级', example=1) - workcenter: str = Field(None, max_length=32, description='工作中心', example="WC001") + workcenter: Optional[str] = Field(None, max_length=32, description='工作中心', example="WC001") status: gc.OrderStatusEnum = Field("CRE", example="CRE", description=f'状态 {list(gc.OrderStatusEnum.__members__.keys())}') req_qty: float = Field(..., description='需求数量(须为负数,若输入正数则自动转为负数)', example=-100.0) req_date: datetime = Field(..., description='需求日期', example="2023-01-07T10:00:00") @@ -876,7 +876,7 @@ class AcceptConfirm(BaseModel): recordqty: float = Field(..., description='报工数量', gt=0, example=100) recorddt: datetime = Field(..., description='报工日期', example="2025-01-07 10:00:00") status: gc.YesNoEnum = Field(..., description='状态') - sysuser: str = Field(None, max_length=32, description='系统用户', example="张三") + sysuser: Optional[str] = Field(None, max_length=32, description='系统用户', example="张三") _raw_input_data: Dict[str, Any] = PrivateAttr(default=None) class Config: diff --git a/core/database.py b/core/database.py index 9efb031..cf16ed5 100644 --- a/core/database.py +++ b/core/database.py @@ -87,6 +87,14 @@ TORTOISE_ORM_CONFIG = { } if THIS_DB_NAME: + model_path = "apps.data_opt.mds.staging_models" + try: + __import__(model_path) + log_config.info(f"✅ 模型模块导入成功: {model_path}") + except ImportError as e: + log_config.error(f"❌ 模型模块导入失败: {model_path} - {e}") + raise + connections[THIS_DB_NAME] = { "engine": "tortoise.backends.asyncpg", "credentials": { @@ -95,16 +103,106 @@ if THIS_DB_NAME: "user": THIS_DB_USER, "password": THIS_DB_PASSWORD, "database": THIS_DB_NAME, - "server_settings": {"TimeZone": TIMEZONE_NAME}, + "server_settings": { + "TimeZone": TIMEZONE_NAME, + "application_name": "myaps_api", + }, + "command_timeout": 60, + "timeout": 30, }, "min_size": 3, "max_size": 10, "use_tz": True, + "pool_recycle": 1800, } + + log_config.info(f"✅ PostgreSQL连接配置完成: {THIS_DB_NAME}@{THIS_DB_HOST}:{THIS_DB_PORT}") + TORTOISE_ORM_CONFIG["apps"]["data_opt_models"] = { - "models": ["apps.data_opt.mds.staging_models"], + "models": [model_path], "default_connection": THIS_DB_NAME, } + + log_config.info(f"✅ 模型注册完成: data_opt_models -> {THIS_DB_NAME}") + + +def validate_database_config() -> Dict[str, Any]: + """ + 验证数据库配置完整性和一致性 + + 多租户场景说明: + - THIS_DB_* 为空是正常情况,表示该租户不使用自有数据库 + - 仅当 THIS_DB_NAME 有值时,才验证配套参数的完整性 + + Returns: + 配置摘要字典 + + Raises: + ValueError: 配置验证失败时抛出(仅在 THIS_DB_NAME 有值时) + """ + import json + + issues = [] + warnings = [] + + if not THIS_DB_NAME: + log_config.info("ℹ️ 该租户未配置自有数据库(THIS_DB_NAME为空),跳过 PostgreSQL 配置验证") + config_summary = { + "has_own_database": False, + "timezone": TIMEZONE_NAME, + "connections": list(connections.keys()), + "apps": list(TORTOISE_ORM_CONFIG["apps"].keys()), + } + log_config.info(f"配置摘要: {json.dumps(config_summary, indent=2, ensure_ascii=False)}") + return config_summary + + log_config.info(f"✓ 检测到自有数据库配置: THIS_DB_NAME={THIS_DB_NAME}") + + required_vars = { + "THIS_DB_HOST": THIS_DB_HOST, + "THIS_DB_PORT": THIS_DB_PORT, + "THIS_DB_USER": THIS_DB_USER, + "THIS_DB_PASSWORD": THIS_DB_PASSWORD, + } + + for var_name, var_value in required_vars.items(): + if not var_value: + issues.append(f"{var_name} 环境变量未设置") + + if THIS_DB_PORT and not (1 <= THIS_DB_PORT <= 65535): + issues.append(f"THIS_DB_PORT={THIS_DB_PORT} 超出有效范围(1-65535)") + + if THIS_DB_NAME not in connections: + issues.append(f"THIS_DB_NAME='{THIS_DB_NAME}' 未在connections配置中找到") + + try: + __import__("apps.data_opt.mds.staging_models") + except ImportError as e: + warnings.append(f"模型路径导入警告: apps.data_opt.mds.staging_models - {e}") + + if issues: + error_msg = "自有数据库配置验证失败:\n" + "\n".join(f" ❌ {issue}" for issue in issues) + log_config.error(error_msg) + raise ValueError(error_msg) + + if warnings: + for warning in warnings: + log_config.warning(warning) + + config_summary = { + "has_own_database": True, + "db_name": THIS_DB_NAME, + "db_host": THIS_DB_HOST, + "db_port": THIS_DB_PORT, + "timezone": TIMEZONE_NAME, + "connections": list(connections.keys()), + "apps": list(TORTOISE_ORM_CONFIG["apps"].keys()), + } + + log_config.info("✅ 数据库配置验证通过") + log_config.info(f"配置摘要: {json.dumps(config_summary, indent=2, ensure_ascii=False)}") + + return config_summary class ConnectionLeakDetector: @@ -308,32 +406,49 @@ smart_pool_manager = SmartConnectionPoolManager() def register_database(app): + """ + 注册Tortoise ORM到FastAPI应用(兼容接口) + + 注意:此函数作为兼容接口保留,实际初始化已移到 lifespan 中 + """ + + validate_database_config() + register_tortoise( app=app, config=TORTOISE_ORM_CONFIG, + generate_schemas=False, + add_exception_handlers=True, ) - # 标记数据库已初始化,允许日志写入数据库 - # 使用统一函数,同时设置 V1 和 V2 + log_config.info("✅ Tortoise ORM 已注册到FastAPI应用") + log_config.info(f"连接配置: {list(TORTOISE_ORM_CONFIG['connections'].keys())}") + log_config.info(f"应用配置: {list(TORTOISE_ORM_CONFIG['apps'].keys())}") + from globalobjects.logger import set_db_initialized_unified set_db_initialized_unified(True) - # 启动监控服务(使用现有的监控架构) from apps.common.monitor.service import monitor_service log_config.info("✅ 系统监控服务已集成") async def warmup_connections(): - """预热数据库连接""" + """ + 预热数据库连接,增强容错处理 + MySQL 连接失败不阻止应用启动 + """ if not MYAPS_MAIN_DB: return try: from globalobjects.db_manager import get_db_managers db_managers = get_db_managers() for db_name, manager in db_managers.items(): + conn_config = TORTOISE_ORM_CONFIG["connections"].get(db_name, {}) + engine = conn_config.get("engine", "") + is_mysql = "mysql" in engine + try: start_time = time.time() - # 使用较短的超时时间,避免启动时被阻塞 is_healthy = await asyncio.wait_for( manager.check_connection_health(timeout=5, fast_mode=True), timeout=10 @@ -342,16 +457,24 @@ async def warmup_connections(): if is_healthy: log_config.info(f"连接预热成功: {db_name} - 响应时间: {response_time:.3f}秒") else: - log_config.warning(f"连接预热失败: {db_name}") - # 尝试刷新连接(使用快速模式) - await asyncio.wait_for( - manager.refresh_connection(fast_mode=True), - timeout=15 - ) + if is_mysql: + log_config.warning(f"⚠️ MySQL连接预热失败: {db_name}(不影响启动)") + else: + log_config.warning(f"连接预热失败: {db_name}") + await asyncio.wait_for( + manager.refresh_connection(fast_mode=True), + timeout=15 + ) except asyncio.TimeoutError: - log_config.warning(f"连接预热超时: {db_name},跳过预热") + if is_mysql: + log_config.warning(f"⚠️ MySQL连接预热超时: {db_name},跳过(不影响启动)") + else: + log_config.warning(f"连接预热超时: {db_name},跳过预热") except Exception as e: - log_config.error(f"连接预热异常: {db_name} - {str(e)}") + if is_mysql: + log_config.warning(f"⚠️ MySQL连接预热异常: {db_name} - {str(e)}(不影响启动)") + else: + log_config.error(f"❌ 连接预热异常: {db_name} - {str(e)}") log_config.info("数据库连接预热完成") except Exception as e: log_config.error(f"连接预热异常: {str(e)}") diff --git a/core/lifespan.py b/core/lifespan.py index 1b79009..bcfafbc 100644 --- a/core/lifespan.py +++ b/core/lifespan.py @@ -22,18 +22,17 @@ from core.database import check_db_connections, warmup_connections, start_pool_m @asynccontextmanager async def lifespan(app): """应用生命周期管理器""" - # 应用启动时执行的操作 log_config.initialize_logging_unified() - # 将主应用事件循环传递给调度器 main_loop = asyncio.get_running_loop() scheduler_manager.set_main_loop(main_loop) log_config.info(f"已将主应用事件循环传递给调度器: {main_loop}") - # 预热数据库连接(在启动其他服务之前) + from core.database import validate_database_config + validate_database_config() + log_config.info("开始预热数据库连接...") try: - # 添加超时保护,避免启动时被阻塞 await asyncio.wait_for(warmup_connections(), timeout=60) log_config.info("数据库连接预热完成") except asyncio.TimeoutError: @@ -41,12 +40,10 @@ async def lifespan(app): except Exception as e: log_config.error(f"❌ 数据库连接预热失败: {e}") - # 启动资源监控 log_config.info("开始启动资源监控...") resource_monitor.start_monitoring(interval=30) log_config.info("系统资源监控已启动") - # 等待服务器完全就绪,确保客户端可以正常连接 log_config.info("等待服务器完全就绪...") await asyncio.sleep(1) log_config.info("服务器已就绪") @@ -348,6 +345,15 @@ async def lifespan(app): # 应用关闭时执行的操作 log_config.info("应用关闭中...") + # 0. 关闭数据库连接 + log_config.info("正在关闭数据库连接...") + try: + from tortoise import Tortoise + await Tortoise.close_connections() + log_config.info("✅ 数据库连接已关闭") + except Exception as e: + log_config.warning(f"⚠️ 关闭数据库连接时出错: {e}") + # 1. 先停止 MySQL Binlog 监控(最依赖数据库) if TURNON_BINLOG_LISTENER: log_config.info("正在停止 MySQL Binlog 监控...") diff --git a/docs/database_optimization_todo.md b/docs/database_optimization_todo.md new file mode 100644 index 0000000..41ea94d --- /dev/null +++ b/docs/database_optimization_todo.md @@ -0,0 +1,387 @@ +# 数据库初始化优化 TODO List + +> 生成时间: 2026-05-20 +> 目标: 解决 Tortoise ORM 初始化问题,确保数据库连接稳定 + +--- + +## P0 - 紧急 (导致所有数据库操作失败) + +### TODO-001: Tortoise ORM 初始化未完成 + +**问题描述**: +- 错误: `RuntimeError: No TortoiseContext is currently active` +- 影响: 所有数据库操作失败 + +**问题位置**: +- `main.py:81` → `core/database.py:310-314` + +**排查步骤**: +1. 检查 `register_tortoise()` 是否正确执行 +2. 验证 `TORTOISE_ORM_CONFIG` 配置是否完整 +3. 确认异步上下文是否正确 + +**解决方案**: +- [ ] 添加初始化日志,确认 `register_tortoise()` 执行时机 +- [ ] 检查 FastAPI lifespan 与 Tortoise 的集成方式 +- [ ] 考虑使用 `register_tortoise()` 的回调模式确保初始化完成 + +**验证方法**: +```python +# 在 register_database 后添加测试 +from tortoise import Tortoise +assert Tortoise._inited, "Tortoise 未初始化" +``` + +--- + +## P1 - 高优先级 + +### TODO-002: 环境变量 THIS_DB_NAME 可能未正确加载 + +**问题描述**: +- 如果 `THIS_DB_NAME` 为空,PostgreSQL 连接不会被添加到配置中 +- 导致 `Tortoise.get_connection(THIS_DB_NAME)` 失败 + +**问题位置**: +- `core/settings.py:170` +- `core/database.py:89` + +**排查步骤**: +1. 检查 `.env` 文件中 `THIS_DB_NAME` 是否设置 +2. 验证环境变量加载顺序 +3. 添加默认值或错误提示 + +**解决方案**: +- [ ] 在 `core/database.py:89` 添加日志输出 `THIS_DB_NAME` 的值 +- [ ] 如果为空,抛出明确的配置错误 +- [ ] 考虑添加配置验证函数 + +**代码改进**: +```python +if not THIS_DB_NAME: + raise ValueError("THIS_DB_NAME 环境变量未设置,请检查 .env 配置") +``` + +--- + +### TODO-003: register_tortoise 缺少关键参数 + +**问题描述**: +- `register_tortoise()` 调用时缺少可选参数 +- 可能导致初始化不完整 + +**问题位置**: +- `core/database.py:310-314` + +**当前代码**: +```python +register_tortoise( + app=app, + config=TORTOISE_ORM_CONFIG, +) +``` + +**解决方案**: +- [ ] 添加 `generate_schemas=False` (明确不自动生成表结构) +- [ ] 添加 `add_exception_handlers=True` (添加异常处理器) +- [ ] 添加 `_create_db=False` (不自动创建数据库) + +**改进代码**: +```python +register_tortoise( + app=app, + config=TORTOISE_ORM_CONFIG, + generate_schemas=False, + add_exception_handlers=True, +) +``` + +--- + +### TODO-004: 数据库模型注册验证 + +**问题描述**: +- 模型路径或连接名称可能配置错误 +- 导致模型无法正确映射到数据库连接 + +**问题位置**: +- `core/database.py:104-107` + +**当前配置**: +```python +TORTOISE_ORM_CONFIG["apps"]["data_opt_models"] = { + "models": ["apps.data_opt.mds.staging_models"], + "default_connection": THIS_DB_NAME, +} +``` + +**排查步骤**: +1. 验证模型路径是否正确 +2. 确认 `THIS_DB_NAME` 与 `connections` 字典中的键匹配 +3. 检查模型是否能正确导入 + +**解决方案**: +- [ ] 添加模型导入验证 +- [ ] 添加连接名称匹配验证 +- [ ] 输出完整的 TORTOISE_ORM_CONFIG 供调试 + +--- + +## P2 - 中优先级 + +### TODO-005: PostgreSQL 连接缺少超时配置 + +**问题描述**: +- PostgreSQL 连接配置缺少 `connect_timeout` +- 可能导致连接建立缓慢或挂起 + +**问题位置**: +- `core/database.py:89-103` + +**当前配置**: +```python +connections[THIS_DB_NAME] = { + "engine": "tortoise.backends.asyncpg", + "credentials": { + "host": THIS_DB_HOST, + "port": THIS_DB_PORT, + "user": THIS_DB_USER, + "password": THIS_DB_PASSWORD, + "database": THIS_DB_NAME, + "server_settings": {"TimeZone": TIMEZONE_NAME}, + }, + "min_size": 3, + "max_size": 10, + "use_tz": True, +} +``` + +**解决方案**: +- [ ] 添加 `connect_timeout` 参数 (建议 30 秒) +- [ ] 添加 `command_timeout` 参数 (建议 60 秒) + +**改进代码**: +```python +connections[THIS_DB_NAME] = { + "engine": "tortoise.backends.asyncpg", + "credentials": { + "host": THIS_DB_HOST, + "port": THIS_DB_PORT, + "user": THIS_DB_USER, + "password": THIS_DB_PASSWORD, + "database": THIS_DB_NAME, + "server_settings": {"TimeZone": TIMEZONE_NAME}, + "command_timeout": 60, + }, + "min_size": 3, + "max_size": 10, + "use_tz": True, +} +# 注意: asyncpg 的 timeout 在 credentials 中 +``` + +--- + +### TODO-006: 数据库操作缺少异常处理 + +**问题描述**: +- 获取数据库连接时无异常处理 +- 一旦 Tortoise 未初始化直接崩溃 + +**问题位置**: +- `apps/data_opt/mds/staging_routers.py:728-729` + +**当前代码**: +```python +conn = Tortoise.get_connection(THIS_DB_NAME) +``` + +**解决方案**: +- [ ] 添加 try-catch 保护 +- [ ] 提供友好的错误提示 +- [ ] 记录详细的错误日志 + +**改进代码**: +```python +try: + conn = Tortoise.get_connection(THIS_DB_NAME) +except Exception as e: + logger.error(f"获取数据库连接失败: {e}") + raise HTTPException( + status_code=500, + detail="数据库连接失败,请检查服务配置或稍后重试" + ) +``` + +--- + +### TODO-007: lifespan 与 register_tortoise 的交互问题 + +**问题描述**: +- `lifespan` 在 `register_database` 之前就关联到 app +- 可能导致初始化时序问题 + +**问题位置**: +- `main.py:31` (create_app) +- `main.py:81` (register_database) + +**当前顺序**: +```python +app = create_app(lifespan=lifespan) # 第31行 +# ... 其他初始化 ... +register_database(app) # 第81行 +``` + +**排查步骤**: +1. 理解 `register_tortoise` 的初始化时机 +2. 确认 lifespan 启动时数据库是否已就绪 +3. 检查是否有竞态条件 + +**解决方案**: +- [ ] 考虑将 `register_database` 移到 `create_app` 内部 +- [ ] 或在 lifespan 的 startup 阶段添加数据库就绪检查 +- [ ] 添加初始化状态标志 + +--- + +## P3 - 低优先级 + +### TODO-008: MySQL 数据库连接超时 (非阻塞) + +**问题描述**: +- MySQL 数据库 `hacy_p` 连接失败 +- 但这是警告,不影响 PostgreSQL 连接 + +**日志信息**: +``` +数据库连接健康检查超时: hacy_p +连接预热超时: hacy_p,跳过预热 +``` + +**解决方案**: +- [ ] 检查 MySQL 数据库配置是否正确 +- [ ] 添加更友好的警告提示 +- [ ] 考虑是否需要 MySQL 连接 (如果不使用可禁用) + +--- + +### TODO-009: 添加数据库连接状态检查端点 + +**问题描述**: +- 缺少专门的数据库健康检查 API +- 难以快速判断数据库连接状态 + +**解决方案**: +- [ ] 添加 `/health/database` 端点 +- [ ] 返回所有数据库连接状态 +- [ ] 包含连接池使用情况 + +**示例代码**: +```python +@router.get("/health/database") +async def check_database_health(): + from tortoise import Tortoise + results = {} + for db_name in Tortoise._connections: + try: + conn = Tortoise.get_connection(db_name) + await conn.execute_query("SELECT 1") + results[db_name] = {"status": "healthy"} + except Exception as e: + results[db_name] = {"status": "unhealthy", "error": str(e)} + return results +``` + +--- + +### TODO-010: 添加配置验证函数 + +**问题描述**: +- 数据库配置分散在多处 +- 缺少统一的验证机制 + +**解决方案**: +- [ ] 创建 `validate_database_config()` 函数 +- [ ] 在应用启动时调用 +- [ ] 输出配置摘要供调试 + +**示例代码**: +```python +def validate_database_config(): + """验证数据库配置""" + issues = [] + + if not THIS_DB_NAME: + issues.append("THIS_DB_NAME 未设置") + + if THIS_DB_NAME and THIS_DB_NAME not in connections: + issues.append(f"{THIS_DB_NAME} 未在 connections 中配置") + + if issues: + raise ValueError("数据库配置错误:\n" + "\n".join(issues)) + + logger.info("数据库配置验证通过") +``` + +--- + +## 执行计划 + +### 阶段1: 紧急修复 (立即执行) +1. [ ] TODO-001: Tortoise 初始化问题 +2. [ ] TODO-002: 环境变量验证 +3. [ ] TODO-003: register_tortoise 参数 + +### 阶段2: 稳定性优化 (本周内) +4. [ ] TODO-004: 模型注册验证 +5. [ ] TODO-005: PostgreSQL 超时配置 +6. [ ] TODO-006: 异常处理 + +### 阶段3: 架构优化 (下周) +7. [ ] TODO-007: lifespan 集成 +8. [ ] TODO-008: MySQL 连接处理 +9. [ ] TODO-009: 健康检查端点 +10. [ ] TODO-010: 配置验证 + +--- + +## 调试命令 + +### 检查环境变量 +```bash +cat .env | grep -E "THIS_DB_|MYAPS_DB" +``` + +### 测试数据库连接 +```bash +# PostgreSQL +PGPASSWORD=123456 psql -h 129.211.172.205 -p 5432 -U n8n -d appsmith -c "SELECT 1" + +# MySQL (如果使用) +mysql -h -P -u -p -e "SELECT 1" +``` + +### 检查 Tortoise 初始化 +```python +# 在应用启动后执行 +from tortoise import Tortoise +print(f"Tortoise initialized: {Tortoise._inited}") +print(f"Connections: {list(Tortoise._connections.keys())}") +``` + +--- + +## 参考资料 + +- [Tortoise ORM 官方文档](https://tortoise.github.io/tortoise-orm/) +- [FastAPI 生命周期事件](https://fastapi.tiangolo.com/advanced/events/) +- [asyncpg 连接参数](https://magicstack.github.io/asyncpg/current/api/index.html#connection) + +--- + +## 变更记录 + +| 日期 | 操作 | 说明 | +|------|------|------| +| 2026-05-20 | 创建 | 初始化数据库优化 TODO List | diff --git a/main.py b/main.py index 94d1126..4d15ea7 100644 --- a/main.py +++ b/main.py @@ -77,7 +77,7 @@ init_registered_routes(app) app.websocket("/")(websocket_root) app.websocket("/ws/{path:path}")(websocket_endpoint) -# 注册数据库 +# 注册数据库(通过 register_tortoise 管理 context) register_database(app) # 启动说明: