解决冲突

This commit is contained in:
2026-04-18 09:22:03 +08:00
parent db4b6d267a
commit d60bc85539
8 changed files with 194 additions and 18 deletions
+7 -2
View File
@@ -5,6 +5,11 @@
"""
from .service import MonitorService
from .background_checker import db_health_checker, start_db_health_checker, stop_db_health_checker
__all__ = ["MonitorService"]
__all__ = [
"MonitorService",
"db_health_checker",
"start_db_health_checker",
"stop_db_health_checker",
]
+162
View File
@@ -0,0 +1,162 @@
"""
数据库健康检查器 - 后台定时任务模块
提供独立的数据库连接健康检查,不依赖前端访问
"""
import asyncio
import os
import time
from typing import Dict, Any, Optional
from globalobjects import logger as log_config
from .collectors import DatabaseCollector
from .allert import AlertType, alert_sender
logger = log_config.get_logger(__name__)
class DatabaseHealthChecker:
"""
数据库健康检查器 - 后台定时任务
独立于前端 WebSocket 连接,定时检查数据库连接状态
"""
def __init__(
self,
interval: int = 60,
alert_cooldown: int = 300,
enabled: bool = True,
reuse_cache: bool = True,
cache_threshold: int = 30
):
self._interval = interval
self._alert_cooldown = alert_cooldown
self._enabled = enabled
self._reuse_cache = reuse_cache
self._cache_threshold = cache_threshold
self._running = False
self._task: Optional[asyncio.Task] = None
self._db_collector = DatabaseCollector()
self._last_alert_time: Dict[str, float] = {}
self._stats = {
"check_count": 0,
"check_success": 0,
"check_failure": 0,
"alert_triggered": 0,
"alert_blocked": 0,
"cache_hits": 0,
}
async def start(self):
"""启动后台检查任务"""
if not self._enabled:
logger.info("数据库健康检查器未启用(DB_HEALTH_CHECK_ENABLED=false")
return
if self._running:
logger.warning("数据库健康检查器已在运行中")
return
self._running = True
self._task = asyncio.create_task(self._check_loop())
logger.info(f"数据库健康检查器已启动(检查间隔: {self._interval}秒, 告警冷却: {self._alert_cooldown}秒)")
async def stop(self):
"""停止后台检查任务"""
if not self._running:
return
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.info("数据库健康检查器已停止")
async def check_once(self) -> Dict[str, Any]:
"""立即执行一次检查,返回检查结果"""
self._stats["check_count"] += 1
try:
status = await self._db_collector.get_connection_status()
self._stats["check_success"] += 1
return status
except Exception as e:
self._stats["check_failure"] += 1
logger.error(f"数据库健康检查执行失败: {e}")
raise
def get_stats(self) -> Dict[str, Any]:
"""获取检查统计信息"""
return {
**self._stats,
"running": self._running,
"interval": self._interval,
"alert_cooldown": self._alert_cooldown,
}
async def _check_loop(self):
"""定时检查循环"""
logger.info("数据库健康检查循环已启动")
while self._running:
try:
await self._perform_check()
except Exception as e:
logger.error(f"数据库健康检查循环异常: {e}")
await asyncio.sleep(self._interval)
async def _perform_check(self):
"""执行检查逻辑"""
self._stats["check_count"] += 1
try:
status = await self._db_collector.get_connection_status()
self._stats["check_success"] += 1
except Exception as e:
self._stats["check_failure"] += 1
logger.error(f"获取数据库连接状态失败: {e}")
return
unhealthy_count = status.get("summary", {}).get("unhealthy", 0)
if unhealthy_count > 0:
alert_key = f"db_unhealthy_{unhealthy_count}"
current_time = time.time()
last_alert = self._last_alert_time.get(alert_key, 0)
if current_time - last_alert >= self._alert_cooldown:
await alert_sender.trigger_alert(
AlertType.DB_CONNECTION,
f"数据库连接异常: {unhealthy_count} 个连接不健康"
)
self._last_alert_time[alert_key] = current_time
self._stats["alert_triggered"] += 1
logger.warning(f"数据库健康检查触发告警: {unhealthy_count} 个连接不健康")
else:
self._stats["alert_blocked"] += 1
remaining = int(self._alert_cooldown - (current_time - last_alert))
logger.debug(f"数据库健康检查告警被冷却拦截,剩余 {remaining}")
db_health_checker = DatabaseHealthChecker(
interval=int(os.getenv('DB_HEALTH_CHECK_INTERVAL', '60')),
alert_cooldown=int(os.getenv('DB_HEALTH_CHECK_COOLDOWN', '300')),
enabled=os.getenv('DB_HEALTH_CHECK_ENABLED', 'true').lower() == 'true',
)
async def start_db_health_checker():
"""启动数据库健康检查器"""
await db_health_checker.start()
async def stop_db_health_checker():
"""停止数据库健康检查器"""
await db_health_checker.stop()
+2 -2
View File
@@ -1,9 +1,9 @@
from .settings import BASE_DIR
from .settings import BASE_DIR, SQLITE_FILE
from .database import TORTOISE_ORM_CONFIG
__all__ = [
"TORTOISE_ORM_CONFIG",
"BASE_DIR"
"BASE_DIR", "SQLITE_FILE"
]
+4 -4
View File
@@ -7,7 +7,7 @@ from datetime import datetime, timedelta
from tortoise.contrib.fastapi import register_tortoise
from core.settings import (
BASE_DIR,
BASE_DIR, SQLITE_FILE,
MYAPS_MAIN_DB, MYAPS_DBSET_LIST, MYAPS_DB_HOST, MYAPS_DB_PORT, MYAPS_DB_USER, MYAPS_DB_PASSWORD,
THIS_DB_NAME, THIS_DB_HOST, THIS_DB_PORT, THIS_DB_USER, THIS_DB_PASSWORD
)
@@ -35,10 +35,10 @@ minsize_per_db = min(2, maxsize_per_db // 2)
# 数据库配置
connections = {
"local_data": {
SQLITE_FILE: {
"engine": "tortoise.backends.sqlite",
"credentials": {
"file_path": BASE_DIR / "storage" / "local_data.sqlite3", # 统一管理数据文件
"file_path": BASE_DIR / "storage" / f"{SQLITE_FILE}.sqlite3", # 统一管理数据文件
"journal_mode": "WAL", # 写前日志,提升并发性能
"synchronous": "NORMAL", # 性能与安全的平衡
"cache_size": -100000, # 100MB 内存缓存
@@ -81,7 +81,7 @@ TORTOISE_ORM_CONFIG = {
},
"monitor_models": {
"models": ["apps.common.monitor.models", "aerich.models"],
"default_connection": "local_data" # 使用local_data数据库
"default_connection": SQLITE_FILE # 使用SQLite数据库
},
},
}
+9 -2
View File
@@ -5,6 +5,7 @@ from globalobjects import logger as log_config
from apps.data_opt.utils.scheduler import scheduler_manager, get_scheduler_status, initialize_scheduler
from apps.data_opt.utils.mysqlmonitor import mysql_monitor
from apps.common.utils.resource_monitor import resource_monitor
from apps.common.monitor import start_db_health_checker, stop_db_health_checker
from globalobjects import EVENT_AGGREGATOR
from core.settings import TURNON_DBMONITOR, TRUNON_SCHEDULER
from core.database import check_db_connections, warmup_connections, start_pool_monitoring
@@ -64,7 +65,10 @@ async def lifespan(app):
# 启动连接池监控任务
asyncio.create_task(start_pool_monitoring())
log_config.info("连接池监控任务已启动")
# 启动数据库健康检查器(独立后台任务,不依赖前端访问)
await start_db_health_checker()
# 等待一段时间,确保所有服务正常启动
await asyncio.sleep(1)
log_config.info("应用启动完成,开始运行")
@@ -94,6 +98,9 @@ async def lifespan(app):
# 停止事件聚合器
EVENT_AGGREGATOR.stop()
log_config.info("事件聚合器已停止")
# 停止数据库健康检查器
await stop_db_health_checker()
# 关闭统一日志系统
log_config.shutdown_logging()
+6 -4
View File
@@ -17,15 +17,17 @@ load_dotenv(os.getenv('ENV_FILE', os.path.join(BASE_DIR, '.env')))
# 数据库监控开关,默认关闭
TURNON_DBMONITOR = os.getenv("TURNON_DBMONITOR", "False").lower() == "true"
TURNON_DBMONITOR = os.getenv("TURNON_DBMONITOR", "False").lower().strip() == "true"
# Binlog 位置管理器开关,默认关闭
TURNON_BINLOG_POSITION_MANAGER = os.getenv("TURNON_BINLOG_POSITION_MANAGER", "False").lower() == "true"
TURNON_BINLOG_POSITION_MANAGER = os.getenv("TURNON_BINLOG_POSITION_MANAGER", "False").lower().strip() == "true"
# 定时任务开关,默认关闭
TRUNON_SCHEDULER = os.getenv("TRUNON_SCHEDULER", "False").lower() == "true"
TRUNON_SCHEDULER = os.getenv("TRUNON_SCHEDULER", "False").lower().strip() == "true"
# 定时任务执行时间
SCHEDULER_HOUR = os.getenv("SCHEDULER_HOUR") or "6,8,10,12,14,16"
SCHEDULER_MINUTE = os.getenv("SCHEDULER_MINUTE") or "55"
LOG_LEVEL = os.getenv("LOG_LEVEL") or "INFO"
LOG_LEVEL = os.getenv("LOG_LEVEL").strip() or "INFO"
# 本地 SQLite 数据库名称
SQLITE_FILE = os.getenv("SQLITE_FILE").replace(".sqlite3", "").strip() or "local_data"
# 监控阈值配置
+1 -1
View File
@@ -119,7 +119,7 @@ class ApsEvent:
if not _events_registered:
aps_pl_status_a2e_event = ApsEvent(event_type="|pl_status_a2e|", single_handler="handle_pl_status_a2e", batch_handler="batch_handle_pl_status_a2e", description="PL 单据下达")
aps_pr_created_event = ApsEvent(event_type="|pr_created|", single_handler="handle_pr_created", batch_handler="batch_handle_pr_created", description="PR 单据 创建")
aps_pl_typeto_mo_event = ApsEvent(event_type="|pl_typeto_mo|", single_handler="handle_pl_typeto_mo", batch_handler="batch_handle_pl_typeto_mo", description="PL 变更为 MO")
aps_pl_typeto_mo_event = ApsEvent(event_type="|pl_typeto_mo|", single_handler="handle_pl_typeto_mo", batch_handler="batch_handle_pl_typeto_mo", flush_interval=10, description="PL 变更为 MO")
aps_pr_deleted_event = ApsEvent(event_type="|pr_deleted|", single_handler="handle_pr_deleted", batch_handler="batch_handle_pr_deleted", description="PR 单据 删除")
_events_registered = True
logger.success("数据库事件注册", "", "所有事件已成功注册")
+3 -3
View File
@@ -1,9 +1,9 @@
from core import TORTOISE_ORM_CONFIG
from core import TORTOISE_ORM_CONFIG, SQLITE_FILE
# 复用 TORTOISE_ORM_CONFIG 中的 local_data 连接配置和 monitor_models 应用配置
# 复用 TORTOISE_ORM_CONFIG 中的 SQLITE_FILE 连接配置和 monitor_models 应用配置
monitor_orm_config = {
"connections": {
"local_data": TORTOISE_ORM_CONFIG["connections"]["local_data"]
SQLITE_FILE: TORTOISE_ORM_CONFIG["connections"][SQLITE_FILE]
},
"apps": {
"monitor_models": TORTOISE_ORM_CONFIG["apps"]["monitor_models"]