fix(mds): 修复数据去重bug并添加手动删除权限控制

数据去重Bug修复:
- 修复复合主键查询错误(只使用第一个主键字段)
- 修复existing_map只保留最后一条记录的问题
- 修复内部重复数据未检查数据库已存在记录的问题
- 删除staging_routers.py中重复调用apply_dedup_strategy的代码

手动删除权限控制:
- 新增ManualRemoveMode枚举(never/now/next)
- 根据MDS_MANUAL_REMOVE环境变量控制删除行为
- never: 禁止手动删除,隐藏删除按钮
- now: 立即删除
- next: 标记为removing状态,下次推送时删除
- 清理历史数据接口也受权限控制
- 前端通过模板注入获取配置,无需API请求

修改文件:
- apps/data_opt/mds/utils/duplicate_checker.py
- apps/data_opt/mds/staging_routers.py
- apps/data_opt/mds/_base.py
- core/routes_register.py
- static/mds/js/data-table.js
- static/mds/js/mds-page-controller.js
- static/mds/pages/template.html
This commit is contained in:
2026-05-22 13:58:05 +08:00
parent 78269d8d74
commit ad03e904d0
13 changed files with 549 additions and 289 deletions
-78
View File
@@ -1,78 +0,0 @@
from fastapi import HTTPException
from tortoise import Tortoise
from globalobjects import logger as log_config
from core.settings import THIS_DB_NAME
from typing import Optional
import asyncio
async def get_db_connection_safely(db_name: Optional[str] = None, max_wait: float = 15.0):
"""
安全获取数据库连接,包含异常处理和友好提示
Args:
db_name: 数据库连接名称,默认使用THIS_DB_NAME
max_wait: 最大等待时间(秒),用于等待ORM初始化
Returns:
数据库连接对象
Raises:
HTTPException: 数据库连接失败时返回500错误
"""
if db_name is None:
db_name = THIS_DB_NAME
try:
if not Tortoise._inited:
# 使用智能等待管理器
from core.database import db_init_manager
log_config.info(f"⏳ 等待数据库初始化完成: {db_name}")
result = await db_init_manager.wait_for_init(max_wait=max_wait)
if not result["success"]:
error_msg = f"数据库初始化失败({result['elapsed']:.1f}秒)"
log_config.error(f"{error_msg}: {result.get('error')}")
raise HTTPException(
status_code=500,
detail=f"数据库服务初始化失败,请稍后重试"
)
log_config.info(f"✅ 数据库就绪,获取连接: {db_name}")
conn = Tortoise.get_connection(db_name)
return conn
except KeyError:
log_config.error(f"❌ 数据库连接不存在: {db_name}")
raise HTTPException(
status_code=500,
detail="数据库连接配置错误,请联系管理员"
)
except Exception as e:
if isinstance(e, HTTPException):
raise
log_config.error(f"❌ 获取数据库连接异常: {db_name} - {type(e).__name__}: {e}")
raise HTTPException(
status_code=500,
detail="数据库连接失败,请检查服务配置或稍后重试"
)
conn = Tortoise.get_connection(db_name)
return conn
except KeyError:
log_config.error(f"❌ 数据库连接不存在: {db_name}")
raise HTTPException(
status_code=500,
detail="数据库连接配置错误,请联系管理员"
)
except Exception as e:
if isinstance(e, HTTPException):
raise
log_config.error(f"❌ 获取数据库连接异常: {db_name} - {type(e).__name__}: {e}")
raise HTTPException(
status_code=500,
detail="数据库连接失败,请检查服务配置或稍后重试"
)
+8
View File
@@ -26,6 +26,7 @@ class StagingStatus(str, Enum):
RELATION_ERROR = ("relation_error", "联检错误", "warning")
SYNC_ERROR = ("sync_error", "推送失败", "warning")
SYNCED = ("synced", "已推送", "secondary")
REMOVING = ("removing", "待删除", "secondary")
def __new__(cls, value, label, color):
obj = str.__new__(cls, value)
@@ -50,6 +51,13 @@ class StagingStatus(str, Enum):
return None
class ManualRemoveMode(str, Enum):
"""手动删除模式"""
NEVER = "never" # 禁止手动删除
NOW = "now" # 立即删除
NEXT = "next" # 下次推送时删除
class ErrorType(str, Enum):
"""错误类型枚举"""
REQUIRED_FIELD = "required_field" # 必填字段缺失
+163 -87
View File
@@ -8,7 +8,7 @@ from datetime import datetime, timezone
from fastapi import APIRouter, Query, Body, HTTPException, status, Request, UploadFile, File
from tortoise.models import Q
from ._base import StagingStatus, INTERNAL_FIELDS, EXCLUDE_FIELDS, TABLE_PROCESS_ORDER, convert_record_to_lowercase, generate_validation_rules_doc, get_field_map
from ._base import StagingStatus, ManualRemoveMode, INTERNAL_FIELDS, EXCLUDE_FIELDS, TABLE_PROCESS_ORDER, convert_record_to_lowercase, generate_validation_rules_doc, get_field_map
from .staging_models import (
TMaterialStaging, TWorkcenterStaging, TMatVerStaging,
TMatWcStaging, TMatWcBomStaging, TMoldStaging, TMatWcMoldStaging,
@@ -18,8 +18,8 @@ from .staging_cleaner import StagingProcessor, DataTransformer, STAGING_TABLE_CO
from .config_generator import TABLE_DISPLAY_CONFIG, SYSTEM_RUNTIME_CONFIG
from apps.io_api.utils.common import standard_response
from apps.io_api.utils.db_operation import db_bupsert
from apps.common.utils.db_helpers import get_db_connection_safely
from core.settings import MYAPS_MAIN_DB, THIS_DB_NAME, MYAPS_DBSET_LIST
from core.database import get_db_connection_safely
from core.settings import MYAPS_MAIN_DB, THIS_DB_NAME, MYAPS_DBSET_LIST, MDS_MANUAL_REMOVE
from globalobjects import logger as log_config
logger = log_config.get_logger(__name__)
@@ -131,8 +131,11 @@ async def delete_existing_records(model_class, table_name: str, records: List[Di
conditions[pk] = value
if conditions:
# 删除所有匹配的记录(而非只删一条)
deleted = await model_class.filter(**conditions).delete()
count += deleted
if deleted > 1:
logger.warning(f"删除重复记录: {table_name} {conditions} 删除了{deleted}")
return count
@@ -772,19 +775,47 @@ async def clear_staging(
):
"""清空缓冲表数据"""
try:
# 检查删除权限
if MDS_MANUAL_REMOVE == ManualRemoveMode.NEVER:
return standard_response(
success=0,
message="禁止手动删除数据(MDS_MANUAL_REMOVE=never"
)
staging_model = STAGING_MODEL_MAPPING.get(table_name)
if not staging_model:
raise ValueError(f"未知的缓冲表: {table_name}")
if status_filter:
deleted = await staging_model.filter(_status=status_filter).delete()
else:
deleted = await staging_model.all().delete()
if MDS_MANUAL_REMOVE == ManualRemoveMode.NOW:
# 立即删除
if status_filter:
deleted = await staging_model.filter(_status=status_filter).delete()
else:
deleted = await staging_model.all().delete()
return standard_response(
success=1,
message=f"已删除 {deleted} 条记录"
)
return standard_response(
success=1,
message=f"已删除 {deleted} 条记录"
)
elif MDS_MANUAL_REMOVE == ManualRemoveMode.NEXT:
# 标记为待删除状态
if status_filter:
marked = await staging_model.filter(_status=status_filter).update(_status=StagingStatus.REMOVING)
else:
marked = await staging_model.all().update(_status=StagingStatus.REMOVING)
return standard_response(
success=1,
message=f"已标记 {marked} 条记录为待删除状态,将在下次推送时统一删除"
)
else:
return standard_response(
success=0,
message=f"未知的删除模式: {MDS_MANUAL_REMOVE}"
)
except Exception as e:
logger.error(f"清空缓冲表失败: {str(e)}")
return standard_response(success=0, message=str(e))
@@ -852,67 +883,74 @@ async def get_monitor_summary(request: Request):
return standard_response(success=0, message=str(e))
@rt.post("/cleanup/old_data", summary="清理历史数据")
async def cleanup_old_data(
request: Request,
days: int = Query(30, description="保留最近N天的数据"),
status_filter: Optional[StagingStatus] = Query(StagingStatus.SYNCED, description="清理的状态类型"),
dry_run: bool = Query(True, description="仅统计不删除")
):
"""清理已同步的历史数据"""
try:
from tortoise import Tortoise
from datetime import timedelta
from core.settings import THIS_DB_NAME
# @rt.post("/cleanup/old_data", summary="清理历史数据")
# async def cleanup_old_data(
# request: Request,
# days: int = Query(30, description="保留最近N天的数据"),
# status_filter: Optional[StagingStatus] = Query(StagingStatus.SYNCED, description="清理的状态类型"),
# dry_run: bool = Query(True, description="仅统计不删除")
# ):
# """清理已同步的历史数据"""
# try:
# # 检查删除权限(清理历史数据属于删除操作)
# if MDS_MANUAL_REMOVE == ManualRemoveMode.NEVER:
# return standard_response(
# success=0,
# message="禁止清理历史数据(MDS_MANUAL_REMOVE=never"
# )
conn = await get_db_connection_safely(THIS_DB_NAME)
# from tortoise import Tortoise
# from datetime import timedelta
# from core.settings import THIS_DB_NAME
cutoff_date = datetime.now() - timedelta(days=days)
# conn = await get_db_connection_safely(THIS_DB_NAME)
tables = [
"t_material_staging",
"t_workcenter_staging",
"t_mat_ver_staging",
"t_mat_wc_staging",
"t_mat_wc_bom_staging",
"t_mold_staging",
"t_mat_wc_mold_staging",
]
# cutoff_date = datetime.now() - timedelta(days=days)
results = []
for table in tables:
count_query = f'''
SELECT COUNT(*) as cnt FROM "{table}"
WHERE "_status" = $1 AND "_synced_time" < $2
'''
result = await conn.execute_query(count_query, (status_filter.value, cutoff_date))
count = result[1][0]['cnt'] if result[1] else 0
# tables = [
# "t_material_staging",
# "t_workcenter_staging",
# "t_mat_ver_staging",
# "t_mat_wc_staging",
# "t_mat_wc_bom_staging",
# "t_mold_staging",
# "t_mat_wc_mold_staging",
# ]
# results = []
# for table in tables:
# count_query = f'''
# SELECT COUNT(*) as cnt FROM "{table}"
# WHERE "_status" = $1 AND "_synced_time" < $2
# '''
# result = await conn.execute_query(count_query, (status_filter.value, cutoff_date))
# count = result[1][0]['cnt'] if result[1] else 0
if not dry_run and count > 0:
delete_query = f'''
DELETE FROM "{table}"
WHERE "_status" = $1 AND "_synced_time" < $2
'''
await conn.execute_query(delete_query, (status_filter.value, cutoff_date))
# if not dry_run and count > 0:
# delete_query = f'''
# DELETE FROM "{table}"
# WHERE "_status" = $1 AND "_synced_time" < $2
# '''
# await conn.execute_query(delete_query, (status_filter.value, cutoff_date))
results.append({
"table": table,
"would_delete": count,
"deleted": count if not dry_run else 0
})
# results.append({
# "table": table,
# "would_delete": count,
# "deleted": count if not dry_run else 0
# })
return standard_response(
success=1,
message=f"{'统计完成(未删除)' if dry_run else '清理完成'}",
data={
"cutoff_date": cutoff_date.isoformat(),
"dry_run": dry_run,
"tables": results
}
)
except Exception as e:
logger.error(f"清理历史数据失败: {str(e)}")
return standard_response(success=0, message=str(e))
# return standard_response(
# success=1,
# message=f"{'统计完成(未删除)' if dry_run else '清理完成'}",
# data={
# "cutoff_date": cutoff_date.isoformat(),
# "dry_run": dry_run,
# "tables": results
# }
# )
# except Exception as e:
# logger.error(f"清理历史数据失败: {str(e)}")
# return standard_response(success=0, message=str(e))
@rt.post("/retry_failed/{table_name}", summary="重试失败的记录")
@@ -985,11 +1023,6 @@ async def upload_excel(
table_name, data_list, strategy, update_mode
)
strategy = DedupStrategy(dedup_strategy)
processed_data, handled_data = await apply_dedup_strategy(
table_name, data_list, strategy
)
table_name_staging = f"{table_name}_staging"
inserted_count = 0
@@ -1389,19 +1422,42 @@ async def delete_staging(
):
"""删除单条缓冲表记录"""
try:
from tortoise import Tortoise
# 检查删除权限
if MDS_MANUAL_REMOVE == ManualRemoveMode.NEVER:
return standard_response(
success=0,
message="禁止手动删除数据(MDS_MANUAL_REMOVE=never"
)
staging_model = STAGING_MODEL_MAPPING.get(table_name)
if not staging_model:
raise ValueError(f"未知的缓冲表: {table_name}")
table_name_staging = f"{table_name}_staging"
conn = await get_db_connection_safely(THIS_DB_NAME)
if MDS_MANUAL_REMOVE == ManualRemoveMode.NOW:
# 立即删除
table_name_staging = f"{table_name}_staging"
conn = await get_db_connection_safely(THIS_DB_NAME)
query = f'DELETE FROM "{table_name_staging}" WHERE "_staging_id" = $1'
await conn.execute_query(query, (staging_id,))
return standard_response(success=1, message="删除成功")
query = f'DELETE FROM "{table_name_staging}" WHERE "_staging_id" = $1'
await conn.execute_query(query, (staging_id,))
elif MDS_MANUAL_REMOVE == ManualRemoveMode.NEXT:
# 标记为待删除状态
marked = await staging_model.filter(_staging_id=staging_id).update(_status=StagingStatus.REMOVING)
if marked > 0:
return standard_response(success=1, message="已标记为待删除状态,将在下次推送时统一删除")
else:
return standard_response(success=0, message="记录不存在")
return standard_response(success=1, message="删除成功")
else:
return standard_response(
success=0,
message=f"未知的删除模式: {MDS_MANUAL_REMOVE}"
)
except Exception as e:
logger.error(f"删除记录失败: {str(e)}")
return standard_response(success=0, message=str(e))
@@ -1415,7 +1471,12 @@ async def batch_delete_staging(
):
"""批量删除缓冲表记录"""
try:
from tortoise import Tortoise
# 检查删除权限
if MDS_MANUAL_REMOVE == ManualRemoveMode.NEVER:
return standard_response(
success=0,
message="禁止手动删除数据(MDS_MANUAL_REMOVE=never"
)
staging_model = STAGING_MODEL_MAPPING.get(table_name)
if not staging_model:
@@ -1424,18 +1485,33 @@ async def batch_delete_staging(
if not staging_ids:
raise ValueError("staging_ids不能为空")
table_name_staging = f"{table_name}_staging"
conn = await get_db_connection_safely(THIS_DB_NAME)
if MDS_MANUAL_REMOVE == ManualRemoveMode.NOW:
# 立即删除
table_name_staging = f"{table_name}_staging"
conn = await get_db_connection_safely(THIS_DB_NAME)
placeholders = ", ".join([f"${i+1}" for i in range(len(staging_ids))])
query = f'DELETE FROM "{table_name_staging}" WHERE "_staging_id" IN ({placeholders})'
await conn.execute_query(query, tuple(staging_ids))
return standard_response(success=1, message=f"已删除 {len(staging_ids)} 条记录")
placeholders = ", ".join([f"${i+1}" for i in range(len(staging_ids))])
query = f'DELETE FROM "{table_name_staging}" WHERE "_staging_id" IN ({placeholders})'
elif MDS_MANUAL_REMOVE == ManualRemoveMode.NEXT:
# 标记为待删除状态
marked = await staging_model.filter(_staging_id__in=staging_ids).update(_status=StagingStatus.REMOVING)
return standard_response(
success=1,
message=f"已标记 {marked} 条记录为待删除状态,将在下次推送时统一删除"
)
await conn.execute_query(query, tuple(staging_ids))
return standard_response(
success=1,
message=f"成功删除 {len(staging_ids)} 条记录"
)
else:
return standard_response(
success=0,
message=f"未知的删除模式: {MDS_MANUAL_REMOVE}"
)
except Exception as e:
logger.error(f"批量删除失败: {str(e)}")
return standard_response(success=0, message=str(e))
+118 -34
View File
@@ -258,37 +258,45 @@ class DuplicateChecker:
# 批量查询已存在记录(优化:一次查询替代N次查询)
if unique_indices and self.pk_fields:
pk_field = self.pk_fields[0] # 主键字段
pk_values_to_query = []
idx_to_pk_map = {}
for idx in unique_indices:
data = data_list[idx]
pk_value = self._get_pk_value(data)
if pk_value:
pk_values_to_query.append(pk_value)
idx_to_pk_map[idx] = pk_value
# 去重后批量查询
unique_pk_values = list(set(pk_values_to_query))
# logger.info(f"去重检测: 表={self.table_name}, 主键字段={pk_field}, 待查询主键值数量={len(unique_pk_values)}, 示例={unique_pk_values[:3]}...")
if unique_pk_values:
if idx_to_pk_map:
try:
# 一次性查询所有已存在的记录
existing_records = await self.staging_model.filter(
**{f"{pk_field}__in": unique_pk_values}
).all()
# 对于复合主键,需要查询所有可能的记录后在内存中匹配
# 步骤1:收集每个主键字段的所有可能值
pk_field_values = {pk: set() for pk in self.pk_fields}
for idx, pk_value in idx_to_pk_map.items():
data = data_list[idx]
for pk in self.pk_fields:
val = data.get(pk)
if val is not None and val != '':
pk_field_values[pk].add(val)
# 构建主键 -> 记录的映射
# 步骤2:构建查询条件(每个字段用 __in)
query = self.staging_model.all()
for pk, values in pk_field_values.items():
if values:
query = query.filter(**{f"{pk}__in": list(values)})
# 步骤3:执行查询
existing_records = await query.all()
# 步骤4:构建复合主键 -> 记录列表的映射(注意:可能有多条相同主键的记录)
existing_map = {}
for record in existing_records:
pk_val = getattr(record, pk_field)
# 统一转换为字符串进行比较
existing_map[str(pk_val)] = record
pk_val = self._get_pk_value_from_record(record)
if pk_val:
if pk_val not in existing_map:
existing_map[pk_val] = []
existing_map[pk_val].append(record)
# logger.info(f"批量查询已存在记录: 找到{len(existing_map)}条, 映射键={list(existing_map.keys())[:3]}")
logger.debug(f"批量查询已存在记录: 表={self.table_name}, 主键字段={self.pk_fields}, 查询返回{len(existing_records)}条, 唯一主键数={len(existing_map)}")
except Exception as e:
logger.error(f"批量查询已存在记录失败: {str(e)}")
import traceback
@@ -303,18 +311,13 @@ class DuplicateChecker:
data = data_list[idx]
pk_value = idx_to_pk_map.get(idx)
# 详细日志:显示主键值和匹配情况
# if pk_value:
# in_map = pk_value in existing_map
# logger.info(f"分类处理: idx={idx}, pk_value={pk_value}, in_existing_map={in_map}")
if pk_value and pk_value in existing_map:
# 已存在于缓冲表
existing_in_db.append({
"index": idx,
"data": data,
"pk_value": pk_value,
"existing_record": existing_map[pk_value] # 完整记录
"existing_records": existing_map[pk_value] # 所有已存在记录列表
})
else:
# 新数据
@@ -369,6 +372,17 @@ class DuplicateChecker:
return None
return "/".join(values) if values else None
def _get_pk_value_from_record(self, record) -> Optional[str]:
"""从ORM记录对象获取主键值"""
values = []
for pk in self.pk_fields:
value = getattr(record, pk, None)
if value is not None and value != '':
values.append(str(value))
else:
return None
return "/".join(values) if values else None
def mark_duplicates_in_dataframe(
self,
df: pd.DataFrame,
@@ -454,7 +468,7 @@ async def apply_dedup_strategy(
else:
processed_data.append(item["data"])
# 处理内部重复:保留每组最后一条
# 处理内部重复:保留每组最后一条,但需要检查数据库是否已存在
internal_dup_groups = {}
for item in result["duplicates"]:
pk_value = item["pk_value"]
@@ -465,9 +479,56 @@ async def apply_dedup_strategy(
for pk_value, items in internal_dup_groups.items():
# 按索引排序,保留最后一条
items_sorted = sorted(items, key=lambda x: x["index"])
# 最后一条:导入
processed_data.append(items_sorted[-1]["data"])
# 其他条:跳过
last_item = items_sorted[-1]
# 检查该主键是否在数据库中已存在(从existing中查找)
existing_item = None
for ex_item in result["existing"]:
if ex_item["pk_value"] == pk_value:
existing_item = ex_item
break
if existing_item:
# 数据库中已存在,进行内容比对
existing_records = existing_item.get("existing_records", [])
new_data = last_item["data"]
all_same = True
diff_info = ""
if existing_records:
for existing_record in existing_records:
is_same, diff = compare_content(existing_record, new_data, update_mode=update_mode)
if not is_same:
all_same = False
diff_info = diff
break
else:
all_same = False
if all_same:
# 内容相同,跳过
handled_data.append({
"data": new_data,
"reason": "内部重复+内容相同,跳过",
"pk_value": pk_value,
"action": "skip"
})
skip_unchanged_count += 1
else:
# 内容不同,覆盖
processed_data.append(new_data)
handled_data.append({
"data": new_data,
"reason": f"内部重复+覆盖已存在记录 ({diff_info})",
"pk_value": pk_value,
"action": "overwrite",
"existing_count": len(existing_records)
})
else:
# 数据库中不存在,直接导入
processed_data.append(last_item["data"])
# 其他内部重复条:跳过
for item in items_sorted[:-1]:
handled_data.append({
"data": item["data"],
@@ -478,14 +539,36 @@ async def apply_dedup_strategy(
# 处理缓冲表已存在的记录(添加内容比对)
skip_unchanged_count = 0
for item in result["existing"]:
existing_record = item.get("existing_record")
pk_value = item["pk_value"]
# 如果该主键在内部重复组中,跳过(已由duplicates逻辑处理)
if pk_value in internal_dup_pk_values:
handled_data.append({
"data": item["data"],
"reason": "内部重复(已在duplicates中处理)",
"pk_value": pk_value,
"action": "skip"
})
continue
existing_records = item.get("existing_records", [])
new_data = item["data"]
# 内容比对(传入update_mode
is_same, diff_info = compare_content(existing_record, new_data, update_mode=update_mode)
# 对所有已存在记录进行内容比对
all_same = True
diff_info = ""
if existing_records:
for existing_record in existing_records:
is_same, diff = compare_content(existing_record, new_data, update_mode=update_mode)
if not is_same:
all_same = False
diff_info = diff
break
else:
all_same = False
if is_same:
# 内容相同,跳过覆盖
if all_same:
# 所有已存在记录内容相同,跳过覆盖
handled_data.append({
"data": new_data,
"reason": f"内容相同,跳过覆盖",
@@ -500,7 +583,8 @@ async def apply_dedup_strategy(
"data": new_data,
"reason": f"覆盖已存在记录 ({diff_info})",
"pk_value": item["pk_value"],
"action": "overwrite"
"action": "overwrite",
"existing_count": len(existing_records) # 标记要删除的记录数
})
# 日志记录跳过数量
+127 -19
View File
@@ -205,13 +205,32 @@ class BinlogPositionManager:
return {}
def _write_json_file(self, file_path, data):
"""写入 JSON 文件"""
"""写入 JSON 文件(原子操作)
使用临时文件 + os.replace() 确保写入原子性
1. 先写入临时文件
2. 原子替换目标文件
3. 失败时清理临时文件
这样可以防止写入过程中断导致文件损坏
"""
temp_path = f"{file_path}.tmp"
try:
with open(file_path, 'w', encoding='utf-8') as f:
# 先写入临时文件
with open(temp_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# 原子替换(os.replace 在 POSIX 系统上是原子操作)
os.replace(temp_path, file_path)
return True
except Exception as e:
logger.error(f"❌ 写入文件 {file_path} 失败: {e}")
# 清理临时文件
try:
if os.path.exists(temp_path):
os.unlink(temp_path)
except:
pass
return False
def load_position(self) -> Optional[Dict[str, Any]]:
@@ -1247,25 +1266,61 @@ class MySQLBinlogListener:
retry_count = 0
self._consecutive_errors = 0
except Exception as e:
except pymysql.Error as e:
# MySQL连接/协议错误 → 重试
self._consecutive_errors += 1
retry_count += 1
if not self.running:
break
# 计算等待时间(指数退避,但不超过最大值)
wait_time = min(2 ** min(retry_count, 8), self._max_retry_wait)
# 记录错误(但避免日志过多)
if retry_count <= 5 or retry_count % 10 == 0:
logger.error(f"Binlog 连接失败 ({retry_count}次): {e}")
logger.error(f"MySQL连接错误 ({retry_count}次): {type(e).__name__}: {e}")
logger.info(f"{wait_time}秒后重试...")
# 发送告警(限制频率)
current_time = time.time()
if current_time - last_alert_time > alert_interval:
self._send_alert(f"binlog监听连接失败: {e},已重试 {retry_count}", "error")
self._send_alert(f"MySQL连接错误: {e},已重试 {retry_count}", "error")
last_alert_time = current_time
except (ConnectionError, ConnectionResetError, BrokenPipeError, OSError) as e:
# 网络连接错误 → 重试
self._consecutive_errors += 1
retry_count += 1
if not self.running:
break
wait_time = min(2 ** min(retry_count, 8), self._max_retry_wait)
if retry_count <= 5 or retry_count % 10 == 0:
logger.warning(f"⚠️ 网络连接中断 ({retry_count}次): {type(e).__name__}")
logger.info(f"{wait_time}秒后重试...")
current_time = time.time()
if current_time - last_alert_time > alert_interval:
self._send_alert(f"网络连接中断,已重试 {retry_count}", "warning")
last_alert_time = current_time
except Exception as e:
# 其他未知错误 → 记录详细堆栈
self._consecutive_errors += 1
retry_count += 1
if not self.running:
break
wait_time = min(2 ** min(retry_count, 8), self._max_retry_wait)
if retry_count <= 5 or retry_count % 10 == 0:
logger.error(f"❌ Binlog监听未知错误 ({retry_count}次): {type(e).__name__}: {e}")
logger.info(f"{wait_time}秒后重试...")
current_time = time.time()
if current_time - last_alert_time > alert_interval:
self._send_alert(f"Binlog监听未知错误: {type(e).__name__}: {e},已重试 {retry_count}", "error")
last_alert_time = current_time
# 等待后重试
@@ -1453,16 +1508,42 @@ class MySQLBinlogListener:
try:
self._thread_pool.submit(self._process_with_counter, event)
return
except concurrent.futures.ThreadPoolExecutor.shutdown as e:
# 线程池已关闭 → 正常退出,不重试
logger.debug(f"binlog监听线程池已关闭,跳过事件处理")
self._decrement_pending()
return
except RuntimeError as e:
# 线程池运行时错误(如队列满)
if "queue" in str(e).lower() or "full" in str(e).lower():
# 队列满 → 触发背压告警
if retry < max_retries - 1:
delay = self._get_retry_delay(retry) * 2 # 倍增等待
logger.warning(f"⚠️ 线程池队列满,{retry+1}/{max_retries} 重试 ({delay:.2f}s后)")
time.sleep(delay)
else:
logger.error(f"❌ 线程池队列持续满载,事件转入DLQ")
self._decrement_pending()
self._add_to_dead_letter_queue(event, f"ThreadPool queue full: {e}")
else:
# 其他运行时错误
if retry < max_retries - 1:
delay = self._get_retry_delay(retry)
logger.warning(f"⚠️ 线程池运行时错误,{retry+1}/{max_retries} 重试: {e}")
time.sleep(delay)
else:
logger.error(f"❌ 线程池运行时错误,已达最大重试: {e}")
self._decrement_pending()
self._add_to_dead_letter_queue(event, str(e))
except Exception as e:
# 其他未知错误
if retry < max_retries - 1:
delay = self._get_retry_delay(retry)
logger.warning(f"⚠️ binlog监听线程池任务提交失败,{retry+1}/{max_retries} 重试 ({delay:.2f}s后): {e}")
logger.warning(f"⚠️ 线程池任务提交失败,{retry+1}/{max_retries} 重试 ({delay:.2f}s后): {e}")
time.sleep(delay)
else:
logger.error(f"binlog监听事件处理失败,已达最大重试次数: {e}")
# 减少待处理计数(因为最终失败了)
logger.error(f"❌ 事件处理失败,已达最大重试次数: {e}")
self._decrement_pending()
# 添加到DeadLetter队列
self._add_to_dead_letter_queue(event, str(e))
def _process_with_counter(self, event):
@@ -1542,11 +1623,18 @@ class MySQLBinlogListener:
elif exec_time > 1.0:
logger.debug(f"binlog监听异步处理器 {handler_name} 执行时间: {exec_time:.2f}")
except Exception as e:
# 检查是否是连接池关闭错误
if "pool" in str(e).lower() and "close" in str(e).lower():
# 细化异常类型
error_str = str(e).lower()
if "pool" in error_str and "close" in error_str:
logger.warning(f"binlog监听连接池已关闭,跳过事件处理: {handler_name}")
elif isinstance(e, asyncio.CancelledError):
logger.debug(f"binlog监听异步任务被取消: {handler_name}")
elif isinstance(e, (TimeoutError, asyncio.TimeoutError)):
logger.warning(f"⚠️ 异步处理器 {handler_name} 执行超时")
elif isinstance(e, (ConnectionError, ConnectionResetError, BrokenPipeError)):
logger.warning(f"⚠️ 异步处理器 {handler_name} 网络错误: {type(e).__name__}")
else:
logger.fail(f"binlog监听异步处理器 {handler_name} 执行", "", str(e))
logger.fail(f"binlog监听异步处理器 {handler_name} 执行", "", f"{type(e).__name__}: {e}")
future.add_done_callback(callback)
except Exception as e:
# 检查是否是连接池关闭错误
@@ -1568,17 +1656,37 @@ class MySQLBinlogListener:
elif exec_time > 1.0:
logger.debug(f"binlog监听同步处理器 {handler_name} 执行时间: {exec_time:.2f}")
return
except (ConnectionError, ConnectionResetError, BrokenPipeError) as e:
# 网络连接错误 → 重试
if retry < max_retries - 1:
delay = self._get_retry_delay(retry)
logger.warning(f"⚠️ 处理器网络错误,{retry+1}/{max_retries} 重试: {type(e).__name__}")
time.sleep(delay)
else:
logger.error(f"❌ 处理器 {handler_name} 网络错误,已达最大重试: {type(e).__name__}")
except (TimeoutError, asyncio.TimeoutError) as e:
# 超时错误 → 不重试(可能是业务逻辑慢)
logger.warning(f"⚠️ 处理器 {handler_name} 执行超时,跳过重试")
return
except asyncio.CancelledError as e:
# 任务被取消 → 正常退出
logger.debug(f"binlog监听任务被取消: {handler_name}")
return
except Exception as e:
# 检查是否是连接池关闭错误
if "pool" in str(e).lower() and "close" in str(e).lower():
# 其他错误 → 按错误类型处理
error_str = str(e).lower()
if "pool" in error_str and "close" in error_str:
logger.warning(f"binlog监听连接池已关闭,跳过事件处理: {handler_name}")
return
elif retry < max_retries - 1:
delay = self._get_retry_delay(retry)
logger.warning(f"⚠️ binlog监听同步处理器执行失败,{retry+1}/{max_retries} 重试 ({delay:.2f}s后): {e}")
logger.warning(f"⚠️ 处理器执行失败,{retry+1}/{max_retries} 重试 ({delay:.2f}s后): {type(e).__name__}: {e}")
time.sleep(delay)
else:
logger.fail(f"binlog监听同步处理器 {handler_name} 执行", "", str(e))
logger.fail(f"binlog监听处理器 {handler_name} 执行", "", f"{type(e).__name__}: {e}")
def process_binlog_event(self, event):
+55 -17
View File
@@ -9,6 +9,7 @@ from typing import Dict, Any, Optional
from datetime import datetime, timedelta
from tortoise.contrib.fastapi import register_tortoise
from tortoise import Tortoise
from core.settings import (
BASE_DIR, SQLITE_FILE,
MYAPS_MAIN_DB, MYAPS_DBSET_LIST, MYAPS_DB_HOST, MYAPS_DB_PORT, MYAPS_DB_USER, MYAPS_DB_PASSWORD,
@@ -633,23 +634,6 @@ def register_database(app):
add_exception_handlers=True,
)
# 注册启动事件:在Tortoise初始化后通知管理器
@app.on_event("startup")
async def notify_db_init_complete():
from tortoise import Tortoise
# 等待Tortoise完成初始化(通常register_tortoise已经确保这一点)
max_check = 50
for i in range(max_check):
if Tortoise._inited:
db_init_manager.mark_initialized()
break
await asyncio.sleep(0.1)
else:
error = RuntimeError("Tortoise初始化检查超时")
db_init_manager.mark_error(error)
log_config.error(f"{error}")
log_config.info("✅ Tortoise ORM 已注册到FastAPI应用")
log_config.info(f"连接配置: {connection_names}")
log_config.info(f"应用配置: {list(TORTOISE_ORM_CONFIG['apps'].keys())}")
@@ -764,3 +748,57 @@ async def start_pool_monitoring():
# 每5分钟执行一次
await asyncio.sleep(300)
async def get_db_connection_safely(db_name: Optional[str] = None, max_wait: float = 15.0):
"""
安全获取数据库连接包含异常处理和友好提示
Args:
db_name: 数据库连接名称默认使用THIS_DB_NAME
max_wait: 最大等待时间用于等待ORM初始化
Returns:
数据库连接对象
Raises:
HTTPException: 数据库连接失败时返回500错误
"""
from fastapi import HTTPException
from core.settings import THIS_DB_NAME
if db_name is None:
db_name = THIS_DB_NAME
try:
if not Tortoise._inited:
log_config.info(f"⏳ 等待数据库初始化完成: {db_name}")
result = await db_init_manager.wait_for_init(max_wait=max_wait)
if not result["success"]:
error_msg = f"数据库初始化失败({result['elapsed']:.1f}秒)"
log_config.error(f"{error_msg}: {result.get('error')}")
raise HTTPException(
status_code=500,
detail=f"数据库服务初始化失败,请稍后重试"
)
log_config.info(f"✅ 数据库就绪,获取连接: {db_name}")
conn = Tortoise.get_connection(db_name)
return conn
except KeyError:
log_config.error(f"❌ 数据库连接不存在: {db_name}")
raise HTTPException(
status_code=500,
detail="数据库连接配置错误,请联系管理员"
)
except Exception as e:
if isinstance(e, HTTPException):
raise
log_config.error(f"❌ 获取数据库连接异常: {db_name} - {type(e).__name__}: {e}")
raise HTTPException(
status_code=500,
detail="数据库连接失败,请检查服务配置或稍后重试"
)
+19 -17
View File
@@ -6,6 +6,7 @@ import json
import inspect
import redis
from globalobjects import logger as log_config
from globalobjects.logger import shutdown_logging
from apps.data_opt.utils.scheduler import scheduler_manager, get_scheduler_status, initialize_scheduler
from apps.data_opt.utils.binlog_listener import binlog_listener
from apps.common.utils.resource_monitor import resource_monitor
@@ -39,24 +40,25 @@ async def lifespan(app):
log_config.error(f"❌ 数据库配置验证失败: {e}")
raise
# 使用智能等待管理器
if not Tortoise._inited:
log_config.info("⏳ 等待 Tortoise ORM 初始化...")
# 等待初始化完成(事件驱动,最多30秒)
result = await db_init_manager.wait_for_init(
max_wait=30.0,
early_exit_check=lambda: asyncio.sleep(0) # 可添加实际连接检查
)
if not result["success"]:
error_msg = f"数据库初始化失败: {result.get('error', '未知错误')}"
# register_tortoise已经通过_merge_lifespan_context确保Tortoise先初始化
# 这里只需检查状态并等待连接建立完成
max_wait = 30.0
start_wait = time.time()
while not Tortoise._inited:
elapsed = time.time() - start_wait
if elapsed > max_wait:
error_msg = f"Tortoise ORM 初始化超时({elapsed:.1f}秒)"
log_config.error(f"{error_msg}")
raise RuntimeError(error_msg)
log_config.info(f"✅ Tortoise ORM 初始化完成,耗时: {result['elapsed']:.2f}")
else:
log_config.info("✅ Tortoise ORM 已初始化")
log_config.debug(f"⏳ 等待 Tortoise ORM 初始化... ({elapsed:.1f}s)")
await asyncio.sleep(0.1)
elapsed = time.time() - start_wait
log_config.info(f"✅ Tortoise ORM 已初始化(等待{elapsed:.2f}秒)")
# 标记初始化完成
db_init_manager.mark_initialized()
log_config.set_db_initialized(True)
log_config.info("✅ 日志数据库写入已启用")
@@ -527,7 +529,7 @@ async def lifespan(app):
log_config.info("==================应用关闭完成==================")
# 12. 关闭统一日志系统
log_config.shutdown_logging()
await shutdown_logging()
# 13. 停止实时日志流服务
await stop_log_stream()
+6
View File
@@ -7,6 +7,7 @@ from apps.common.monitor.routers import router as monitor_rt
from apps.common.help.routers import router as help_rt
from apps.data_opt.mds.config_generator import TABLE_DISPLAY_CONFIG
from apps.data_opt.mds.staging_cleaner import STAGING_TABLE_CONFIG
from core.settings import MDS_MANUAL_REMOVE
import os
import json
@@ -96,6 +97,11 @@ def render_mds_page(page_key):
from apps.data_opt.mds.config_generator import get_cached_config
frontend_config = get_cached_config(page_key)
# 注入删除模式配置到前端
if frontend_config:
frontend_config["removeMode"] = MDS_MANUAL_REMOVE
frontend_config["removeAllowed"] = MDS_MANUAL_REMOVE != "never"
# 准备替换变量
replacements = {
"{page_title}": config["page_title"],
+2
View File
@@ -100,6 +100,8 @@ MAX_EVENTS_BATCH_SIZE = max(1, int(os.getenv("MAX_EVENTS_BATCH_SIZE") or 1))
MAX_EVENTS_PER_SECOND = max(1, int(os.getenv("MAX_EVENTS_PER_SECOND") or 1))
MDS_MANUAL_REMOVE = str(os.getenv("MDS_MANUAL_REMOVE", "never")).lower()
# 监控阈值配置
MONITOR_THRESHOLDS = {
# 资源监控阈值
+7 -37
View File
@@ -539,12 +539,9 @@ class DbManager:
# 尝试获取连接,最多尝试3次
for attempt in range(3):
try:
# 检查Tortoise是否已初始化
# Tortoise ORM已由lifespan初始化,禁止运行时重新初始化
if not hasattr(Tortoise, '_inited') or not Tortoise._inited:
logger.warning(f"Tortoise未初始化,尝试重新初始化: {self.connection_name}")
from core.database import TORTOISE_ORM_CONFIG
await Tortoise.init(config=TORTOISE_ORM_CONFIG)
logger.info(f"Tortoise重新初始化成功")
raise RuntimeError("Tortoise ORM未初始化,请等待应用启动完成")
# 获取数据库连接
conn = Tortoise.get_connection(self.connection_name)
@@ -1864,38 +1861,11 @@ class DbManager:
except Exception as discard_error:
logger.warning(f"移除连接时出错: {discard_error}")
# 3. 尝试重新初始化 Tortoise,解决连接池关闭的问题
try:
from core.database import TORTOISE_ORM_CONFIG
if hasattr(Tortoise, '_inited') and Tortoise._inited:
# 尝试关闭所有连接
try:
# 安全检查:确保connections对象和_connections属性存在
if hasattr(connections, '_connections'):
for conn_name in connections._connections.keys():
try:
conn = connections.get(conn_name)
if conn and hasattr(conn, 'close'):
try:
await conn.close()
except Exception as close_error:
logger.warning(f"关闭连接 {conn_name} 时出错: {close_error}")
except Exception as get_conn_error:
logger.warning(f"获取连接 {conn_name} 时出错: {get_conn_error}")
else:
logger.info("连接池未初始化,跳过关闭操作")
except Exception as close_all_error:
logger.warning(f"关闭所有连接时出错: {close_all_error}")
# 尝试重新初始化 Tortoise
try:
logger.info(f"尝试重新初始化 Tortoise")
await Tortoise.init(config=TORTOISE_ORM_CONFIG)
logger.info(f"Tortoise 重新初始化成功")
except Exception as init_error:
logger.warning(f"重新初始化 Tortoise 时出错: {init_error}")
except Exception as init_error:
logger.warning(f"重新初始化 Tortoise 时出错: {init_error}")
# 3. Tortoise ORM已由lifespan初始化,禁止运行时重新初始化
# 运行时重新初始化会破坏TortoiseContext导致所有请求失败
if not hasattr(Tortoise, '_inited') or not Tortoise._inited:
logger.error(f"Tortoise未初始化,无法刷新连接。请等待应用启动完成")
raise RuntimeError("Tortoise ORM未初始化,请等待应用启动完成")
# 3. 等待一段时间,确保连接完全关闭
if fast_mode:
+31
View File
@@ -242,6 +242,9 @@ class DataTable {
});
}
// 检查删除权限
this.checkRemovePermission();
// 排序点击
this.container.querySelectorAll('th.sortable').forEach(th => {
th.addEventListener('click', () => {
@@ -1466,6 +1469,34 @@ class DataTable {
this.updateSelectedCount();
}
/**
* 检查删除权限并控制按钮显示
*/
checkRemovePermission() {
// 从全局配置中获取删除模式(由后端模板注入)
if (typeof MDS_PAGE_CONFIG !== 'undefined' && MDS_PAGE_CONFIG) {
const { removeMode, removeAllowed } = MDS_PAGE_CONFIG;
// 控制批量删除按钮
const batchDeleteBtn = document.getElementById('batchDeleteBtn');
if (batchDeleteBtn) {
if (removeAllowed === false) {
batchDeleteBtn.style.display = 'none';
} else {
batchDeleteBtn.style.display = '';
}
}
// 保存删除模式到实例
this.removeMode = removeMode;
this.removeAllowed = removeAllowed;
} else {
// 配置未注入时的默认行为
console.warn('MDS_PAGE_CONFIG未定义,删除权限检查失败');
this.removeAllowed = true; // 默认允许
}
}
/**
* 销毁组件
*/
+12
View File
@@ -1011,6 +1011,18 @@ class MDSPageController {
saveBtn.onclick = () => this.saveRecord(row._staging_id);
}
// 绑定删除按钮
const deleteBtn = document.getElementById('deleteBtn');
if (deleteBtn) {
// 检查删除权限
if (this.dataTable && this.dataTable.removeAllowed === false) {
deleteBtn.style.display = 'none';
} else {
deleteBtn.style.display = '';
deleteBtn.onclick = () => this.deleteRecord(row._staging_id);
}
}
const detailModal = document.getElementById('detailModal');
if (detailModal && bootstrap.Modal.getInstance(detailModal)) {
bootstrap.Modal.getInstance(detailModal).hide();
+1
View File
@@ -231,6 +231,7 @@
<form id="editForm"></form>
</div>
<div class="modal-footer">
<button type="button" class="btn btn-danger" id="deleteBtn">删除</button>
<button type="button" class="btn btn-secondary" data-bs-dismiss="modal">取消</button>
<button type="button" class="btn btn-primary" id="saveBtn">保存</button>
</div>