合并db_init_manager

This commit is contained in:
2026-05-21 21:47:19 +08:00
parent 784a3f68cf
commit f785f23389
7 changed files with 206 additions and 206 deletions
-4
View File
@@ -35,10 +35,6 @@ REDIS_PASSWORD=
TURNON_BINLOG_LISTENER=false
TRUNON_SCHEDULER=false
# Staging模式配置
# 用于数据清洗模式的特殊数据库名称标识
STAGING_DB_NAME=--s
# 日志配置
LOG_RETENTION=5
USE_LOGURU=true
+1 -1
View File
@@ -26,7 +26,7 @@ async def get_db_connection_safely(db_name: Optional[str] = None, max_wait: floa
try:
if not Tortoise._inited:
# 使用智能等待管理器
from core.db_init_manager import db_init_manager
from core.database import db_init_manager
log_config.info(f"⏳ 等待数据库初始化完成: {db_name}")
result = await db_init_manager.wait_for_init(max_wait=max_wait)
+1 -1
View File
@@ -81,7 +81,7 @@ def map_staging_response_to_direct(staging_response: dict) -> dict:
async def dispatch_to_staging(
table_key: str,
data: List,
source_system: str = "unknown",
source_system: str = "API",
dedup_strategy: DedupStrategyEnum = DedupStrategyEnum.OVERWRITE,
update_mode: UpdateModeEnum = UpdateModeEnum.PARTIAL
) -> dict:
+198 -2
View File
@@ -1,3 +1,7 @@
"""
数据库配置和初始化管理
包含 Tortoise ORM 配置连接池管理初始化状态管理器
"""
import time
import asyncio
from collections import defaultdict
@@ -13,6 +17,200 @@ from core.settings import (
)
from globalobjects import logger as log_config
# ============================================================================
# 数据库初始化状态管理器
# ============================================================================
class DatabaseInitManager:
"""
数据库初始化状态管理器
解决 Tortoise ORM 异步初始化的竞态条件
特性
1. 事件驱动初始化完成后主动通知等待者
2. 实际检查测试真实连接而非仅检查标志位
3. 超时保护避免无限等待
4. 状态追踪记录初始化进度和耗时
"""
_instance: Optional['DatabaseInitManager'] = None
def __init__(self):
# 初始化完成事件
self._init_event = asyncio.Event()
# 初始化开始时间
self._start_time: Optional[datetime] = None
# 初始化完成时间
self._end_time: Optional[datetime] = None
# 是否已初始化
self._initialized = False
# 初始化失败的错误信息
self._error: Optional[Exception] = None
# 连接名称列表
self._connection_names: list = []
# 已成功建立的连接数
self._ready_connections: int = 0
@classmethod
def get_instance(cls) -> 'DatabaseInitManager':
"""获取单例实例"""
if cls._instance is None:
cls._instance = cls()
return cls._instance
def start_init(self, connection_names: list):
"""标记初始化开始"""
self._start_time = datetime.now()
self._connection_names = connection_names
log_config.info(f"🔹 数据库初始化开始: {connection_names}")
def mark_initialized(self):
"""标记初始化完成"""
self._initialized = True
self._end_time = datetime.now()
self._init_event.set()
if self._start_time:
elapsed = (self._end_time - self._start_time).total_seconds()
log_config.info(f"✅ 数据库初始化完成,耗时: {elapsed:.2f}")
def mark_error(self, error: Exception):
"""标记初始化失败"""
self._error = error
self._end_time = datetime.now()
self._init_event.set()
if self._start_time:
elapsed = (self._end_time - self._start_time).total_seconds()
log_config.error(f"❌ 数据库初始化失败,耗时: {elapsed:.2f}秒: {error}")
def mark_connection_ready(self, conn_name: str):
"""标记某个连接已就绪"""
self._ready_connections += 1
log_config.debug(f" ✓ 连接就绪: {conn_name} ({self._ready_connections}/{len(self._connection_names)})")
async def wait_for_init(
self,
max_wait: float = 30.0,
check_interval: float = 0.1,
early_exit_check: Optional[callable] = None
) -> Dict[str, Any]:
"""
等待初始化完成
Args:
max_wait: 最大等待时间
check_interval: 检查间隔
early_exit_check: 提前退出检查函数返回True则提前结束
Returns:
{
"success": bool,
"elapsed": float,
"ready_connections": int,
"error": Optional[Exception]
}
"""
start = datetime.now()
# 如果已经初始化完成,立即返回
if self._initialized:
elapsed = (datetime.now() - start).total_seconds()
return {
"success": True,
"elapsed": elapsed,
"ready_connections": self._ready_connections,
"error": None
}
# 如果已经失败,立即返回
if self._error:
elapsed = (datetime.now() - start).total_seconds()
return {
"success": False,
"elapsed": elapsed,
"ready_connections": self._ready_connections,
"error": self._error
}
# 等待初始化事件,带超时
try:
await asyncio.wait_for(
self._init_event.wait(),
timeout=max_wait
)
except asyncio.TimeoutError:
elapsed = (datetime.now() - start).total_seconds()
log_config.warning(f"⚠️ 数据库初始化等待超时: {elapsed:.2f}")
return {
"success": False,
"elapsed": elapsed,
"ready_connections": self._ready_connections,
"error": TimeoutError(f"初始化超时({max_wait}秒)")
}
# 检查是否有错误
if self._error:
elapsed = (datetime.now() - start).total_seconds()
return {
"success": False,
"elapsed": elapsed,
"ready_connections": self._ready_connections,
"error": self._error
}
# 执行额外检查(如实际连接测试)
if early_exit_check:
try:
check_result = await early_exit_check()
if not check_result:
log_config.warning("⚠️ 初始化完成,但连接检查未通过")
except Exception as e:
log_config.warning(f"⚠️ 连接检查失败: {e}")
elapsed = (datetime.now() - start).total_seconds()
return {
"success": True,
"elapsed": elapsed,
"ready_connections": self._ready_connections,
"error": None
}
@property
def is_initialized(self) -> bool:
"""检查是否已初始化"""
return self._initialized
@property
def init_elapsed(self) -> Optional[float]:
"""获取初始化耗时"""
if self._start_time and self._end_time:
return (self._end_time - self._start_time).total_seconds()
return None
def get_status(self) -> Dict[str, Any]:
"""获取当前状态"""
return {
"initialized": self._initialized,
"start_time": self._start_time.isoformat() if self._start_time else None,
"end_time": self._end_time.isoformat() if self._end_time else None,
"elapsed": self.init_elapsed,
"connection_names": self._connection_names,
"ready_connections": self._ready_connections,
"error": str(self._error) if self._error else None
}
# 全局单例
db_init_manager = DatabaseInitManager.get_instance()
# ============================================================================
# 数据库连接配置
# ============================================================================
# 计算连接池大小:根据账套数量动态调整,避免连接总数过多
# 总连接数 = 账套数 × maxsize,应控制在合理范围内(建议不超过150)
import os
@@ -425,7 +623,6 @@ def register_database(app):
validate_database_config()
# 标记初始化开始
from core.db_init_manager import db_init_manager
connection_names = list(TORTOISE_ORM_CONFIG['connections'].keys())
db_init_manager.start_init(connection_names)
@@ -440,7 +637,6 @@ def register_database(app):
@app.on_event("startup")
async def notify_db_init_complete():
from tortoise import Tortoise
from core.db_init_manager import db_init_manager
# 等待Tortoise完成初始化(通常register_tortoise已经确保这一点)
max_check = 50
-191
View File
@@ -1,191 +0,0 @@
"""
数据库初始化状态管理器
解决 Tortoise ORM 异步初始化的竞态条件
"""
import asyncio
from typing import Optional, Dict, Any
from datetime import datetime
from globalobjects import logger as log_config
class DatabaseInitManager:
"""
数据库初始化状态管理器
特性
1. 事件驱动初始化完成后主动通知等待者
2. 实际检查测试真实连接而非仅检查标志位
3. 超时保护避免无限等待
4. 状态追踪记录初始化进度和耗时
"""
_instance: Optional['DatabaseInitManager'] = None
def __init__(self):
# 初始化完成事件
self._init_event = asyncio.Event()
# 初始化开始时间
self._start_time: Optional[datetime] = None
# 初始化完成时间
self._end_time: Optional[datetime] = None
# 是否已初始化
self._initialized = False
# 初始化失败的错误信息
self._error: Optional[Exception] = None
# 连接名称列表
self._connection_names: list = []
# 已成功建立的连接数
self._ready_connections: int = 0
@classmethod
def get_instance(cls) -> 'DatabaseInitManager':
"""获取单例实例"""
if cls._instance is None:
cls._instance = cls()
return cls._instance
def start_init(self, connection_names: list):
"""标记初始化开始"""
self._start_time = datetime.now()
self._connection_names = connection_names
log_config.info(f"🔹 数据库初始化开始: {connection_names}")
def mark_initialized(self):
"""标记初始化完成"""
self._initialized = True
self._end_time = datetime.now()
self._init_event.set()
if self._start_time:
elapsed = (self._end_time - self._start_time).total_seconds()
log_config.info(f"✅ 数据库初始化完成,耗时: {elapsed:.2f}")
def mark_error(self, error: Exception):
"""标记初始化失败"""
self._error = error
self._end_time = datetime.now()
self._init_event.set()
if self._start_time:
elapsed = (self._end_time - self._start_time).total_seconds()
log_config.error(f"❌ 数据库初始化失败,耗时: {elapsed:.2f}秒: {error}")
def mark_connection_ready(self, conn_name: str):
"""标记某个连接已就绪"""
self._ready_connections += 1
log_config.debug(f" ✓ 连接就绪: {conn_name} ({self._ready_connections}/{len(self._connection_names)})")
async def wait_for_init(
self,
max_wait: float = 30.0,
check_interval: float = 0.1,
early_exit_check: Optional[callable] = None
) -> Dict[str, Any]:
"""
等待初始化完成
Args:
max_wait: 最大等待时间
check_interval: 检查间隔
early_exit_check: 提前退出检查函数返回True则提前结束
Returns:
{
"success": bool,
"elapsed": float,
"ready_connections": int,
"error": Optional[Exception]
}
"""
start = datetime.now()
# 如果已经初始化完成,立即返回
if self._initialized:
elapsed = (datetime.now() - start).total_seconds()
return {
"success": True,
"elapsed": elapsed,
"ready_connections": self._ready_connections,
"error": None
}
# 如果已经失败,立即返回
if self._error:
elapsed = (datetime.now() - start).total_seconds()
return {
"success": False,
"elapsed": elapsed,
"ready_connections": self._ready_connections,
"error": self._error
}
# 等待初始化事件,带超时
try:
await asyncio.wait_for(
self._init_event.wait(),
timeout=max_wait
)
except asyncio.TimeoutError:
elapsed = (datetime.now() - start).total_seconds()
log_config.warning(f"⚠️ 数据库初始化等待超时: {elapsed:.2f}")
return {
"success": False,
"elapsed": elapsed,
"ready_connections": self._ready_connections,
"error": TimeoutError(f"初始化超时({max_wait}秒)")
}
# 检查是否有错误
if self._error:
elapsed = (datetime.now() - start).total_seconds()
return {
"success": False,
"elapsed": elapsed,
"ready_connections": self._ready_connections,
"error": self._error
}
# 执行额外检查(如实际连接测试)
if early_exit_check:
try:
check_result = await early_exit_check()
if not check_result:
log_config.warning("⚠️ 初始化完成,但连接检查未通过")
except Exception as e:
log_config.warning(f"⚠️ 连接检查失败: {e}")
elapsed = (datetime.now() - start).total_seconds()
return {
"success": True,
"elapsed": elapsed,
"ready_connections": self._ready_connections,
"error": None
}
@property
def is_initialized(self) -> bool:
"""检查是否已初始化"""
return self._initialized
@property
def init_elapsed(self) -> Optional[float]:
"""获取初始化耗时"""
if self._start_time and self._end_time:
return (self._end_time - self._start_time).total_seconds()
return None
def get_status(self) -> Dict[str, Any]:
"""获取当前状态"""
return {
"initialized": self._initialized,
"start_time": self._start_time.isoformat() if self._start_time else None,
"end_time": self._end_time.isoformat() if self._end_time else None,
"elapsed": self.init_elapsed,
"connection_names": self._connection_names,
"ready_connections": self._ready_connections,
"error": str(self._error) if self._error else None
}
# 全局单例
db_init_manager = DatabaseInitManager.get_instance()
+1 -2
View File
@@ -16,8 +16,7 @@ from apps.common.monitor import (
from apps.common.monitor.log_stream_service import start_log_stream, stop_log_stream
from globalobjects import EVENT_AGGREGATOR
from core.settings import TURNON_BINLOG_LISTENER, TRUNON_SCHEDULER, MAX_EVENTS_BATCH_SIZE
from core.database import check_db_connections, warmup_connections, start_pool_monitoring
from core.db_init_manager import db_init_manager
from core.database import check_db_connections, warmup_connections, start_pool_monitoring, db_init_manager
@asynccontextmanager
+5 -5
View File
@@ -168,12 +168,12 @@ class DataTable {
if (defaultValue !== undefined && defaultValue !== null) {
titleParts.push(`默认: ${defaultValue}`);
}
if (col.sortable) titleParts.push('点击排序');
if (col.readOnly) titleParts.push('只读');
if (isEnum) titleParts.push('枚举');
if (isForeignKey) titleParts.push('外键');
if (isPrimaryKey) titleParts.push(isCompositeKey ? '联合主键' : '主键');
if (isRequired) titleParts.push('必填');
if (isPrimaryKey) titleParts.push(isCompositeKey ? '联合主键' : '主键');
if (isForeignKey) titleParts.push('外键');
if (isEnum) titleParts.push('枚举');
if (col.readOnly) titleParts.push('只读');
if (col.sortable) titleParts.push('点击排序');
return `
<th