feat(mds): 实现removing状态数据在推送时删除正式表对应记录

核心逻辑:
- removing状态数据能且仅能在推送时删除,禁止自动清理
- 增量推送:根据business_keys删除正式表+缓冲表记录
- 刷新推送:正式表已TRUNCATE,只删除缓冲表记录

修改内容:
1. staging_cleaner.py
   - sync_to_production新增处理removing状态数据逻辑
   - 根据推送模式区分删除策略
   - 增量模式:按business_keys构建WHERE条件删除正式表记录
   - 刷新模式:直接删除缓冲表记录(正式表已TRUNCATE)

2. staging_routers.py
   - 同步路由返回统计新增removed字段
   - 更新返回消息显示删除数量

数据一致性保证:
- 缓冲表removing记录 → 推送时 → 正式表对应记录一并删除
- 确保缓冲表和正式表数据状态一致
This commit is contained in:
2026-05-22 14:23:27 +08:00
parent ad03e904d0
commit d39287b7ee
2 changed files with 63 additions and 3 deletions
+58 -1
View File
@@ -1078,12 +1078,69 @@ class StagingProcessor:
raise ValueError(f"表配置不完整: {table_name}")
target_db_name = target_db if target_db else MYAPS_MAIN_DB
stats = {"synced": 0, "failed": 0, "skipped": 0, "target_db": target_db_name, "synced_staging_ids": []}
stats = {"synced": 0, "failed": 0, "skipped": 0, "target_db": target_db_name, "synced_staging_ids": [], "removed": 0}
pg_conn = Tortoise.get_connection(THIS_DB_NAME)
staging_table_name = staging_model._meta.db_table
target_table_name = target_model._meta.db_table
# ========== 步骤1:处理removing状态的数据 ==========
# removing状态的数据只能在推送时删除,确保缓冲表和正式表数据一致
removing_query = f'SELECT * FROM "{staging_table_name}" WHERE "_status" = $1'
removing_result = await pg_conn.execute_query(removing_query, ("removing",))
removing_records = removing_result[1] if removing_result[1] else []
if removing_records:
logger.info(f"处理待删除数据 [{table_name}]: 找到{len(removing_records)}条removing状态记录")
# 获取业务主键字段
business_keys = config.get("business_keys", [])
if mode == "refresh":
# 刷新模式:正式表已被TRUNCATE,直接删除缓冲表记录
delete_ids = [r["_staging_id"] for r in removing_records]
placeholders = ", ".join([f"${i+1}" for i in range(len(delete_ids))])
delete_query = f'DELETE FROM "{staging_table_name}" WHERE "_staging_id" IN ({placeholders})'
await pg_conn.execute_query(delete_query, tuple(delete_ids))
stats["removed"] = len(delete_ids)
logger.info(f"刷新模式:已删除缓冲表removing记录 {len(delete_ids)}")
else:
# 增量模式:需要删除正式表中对应的记录
mysql_conn = Tortoise.get_connection(target_db_name)
# 构建删除条件:根据business_keys批量删除
for record in removing_records:
if business_keys:
# 构建WHERE条件
conditions = []
values = []
for pk in business_keys:
# 获取字段对应的数据库列名
field_obj = target_model._meta.fields_map.get(pk)
db_col = field_obj.source_field if field_obj and hasattr(field_obj, 'source_field') and field_obj.source_field else pk
value = record.get(db_col)
if value is not None:
conditions.append(f"`{db_col}` = %s")
values.append(value)
if conditions:
delete_sql = f"DELETE FROM `{target_table_name}` WHERE {' AND '.join(conditions)}"
try:
await mysql_conn.execute_query(delete_sql, tuple(values))
except Exception as e:
logger.error(f"删除正式表记录失败 [{table_name}]: {str(e)}, 条件={conditions}")
# 删除缓冲表记录
await pg_conn.execute_query(
f'DELETE FROM "{staging_table_name}" WHERE "_staging_id" = $1',
(record["_staging_id"],)
)
stats["removed"] += 1
logger.info(f"增量模式:已删除正式表+缓冲表removing记录 {stats['removed']}")
# ========== 步骤2:处理同步逻辑 ==========
if mode == "refresh" and not skip_truncate:
mysql_conn = Tortoise.get_connection(target_db_name)
truncate_query = f'TRUNCATE TABLE `{target_table_name}`'
+5 -2
View File
@@ -412,6 +412,7 @@ async def sync_to_production(
total_failed = sum(s.get("failed") or 0 for s in all_stats.values())
total_skipped = sum(s.get("skipped") or 0 for s in all_stats.values())
total_dedup = sum(len(s.get("dedup_staging_ids", [])) for s in all_stats.values())
total_removed = sum(s.get("removed") or 0 for s in all_stats.values())
# 将details转为数组格式
details_list = [
@@ -420,20 +421,22 @@ async def sync_to_production(
"synced": len(stats.get("synced_staging_ids", [])),
"failed": stats.get("failed") or 0,
"skipped": stats.get("skipped") or 0,
"dedup": len(stats.get("dedup_staging_ids", []))
"dedup": len(stats.get("dedup_staging_ids", [])),
"removed": stats.get("removed") or 0
}
for db_name, stats in all_stats.items()
]
return standard_response(
success=1,
message=f"同步完成: {len(target_db_list)}个账套, 成功{total_synced}条, 去重失败{total_dedup}条, 其他失败{total_failed}",
message=f"同步完成: {len(target_db_list)}个账套, 成功{total_synced}条, 删除{total_removed}条, 去重失败{total_dedup}条, 其他失败{total_failed}",
data={
"target_dbs": target_db_list,
"total_synced": total_synced,
"total_failed": total_failed,
"total_skipped": total_skipped,
"total_dedup": total_dedup,
"total_removed": total_removed,
"details": details_list
}
)