缓冲表drop逻辑

This commit is contained in:
2026-05-22 22:03:55 +08:00
parent dc8e347e0d
commit 7108454c59
3 changed files with 93 additions and 7 deletions
+4 -1
View File
@@ -121,7 +121,8 @@ STAGING_TABLE_CONFIG = {
# "validator": validate_mat_wc_rules,
# }
],
"pre_batch_hook": [mat_wc_route_check_hook]
"pre_batch_hook": [mat_wc_route_check_hook],
"drop_fields": ["materialno", "matver"],
},
"t_mat_wc_bom": {
@@ -171,6 +172,7 @@ STAGING_TABLE_CONFIG = {
create_range_rule("scrap", 0, 100, "损耗率必须在0-100之间"),
],
"pre_batch_hook": [bom_structure_check_hook],
"drop_fields": ["productno", "matver"],
},
"t_mold": {
@@ -229,6 +231,7 @@ STAGING_TABLE_CONFIG = {
# "validator": validate_mat_wc_mold_rules,
# }
],
"drop_fields": ["materialno", "itemno"],
},
}
+19 -5
View File
@@ -17,7 +17,7 @@ from .schemas import (
#DeleteSupply
)
from .utils.common import standard_response, drop_matched_data
from .utils.common import standard_response, drop_matched_data, mark_as_removing
from .utils.db_operation import db_exec_sql, db_managers, db_query, db_supsert, db_bupsert, db_delete, call_dbprocdure, db_update_by_index
# from project_files import hap_conn
from apps.data_opt.utils.data_processor import DataProcessor
@@ -83,7 +83,8 @@ async def dispatch_to_staging(
data: List,
source_system: str = "API",
dedup_strategy: DedupStrategyEnum = DedupStrategyEnum.OVERWRITE,
update_mode: UpdateModeEnum = UpdateModeEnum.PARTIAL
update_mode: UpdateModeEnum = UpdateModeEnum.PARTIAL,
drop: Literal["all", "matched"] = None
) -> dict:
if not STAGING_MODULES_AVAILABLE:
return {
@@ -108,6 +109,16 @@ async def dispatch_to_staging(
model = config["model"]
table_name = f"{table_key}_staging"
if drop:
drop_fields = config.get("drop_fields")
await mark_as_removing(
model_class=model,
table_name=table_name,
drop=drop,
data_list=[item.model_dump() if hasattr(item, "model_dump") else item for item in data],
drop_fields=drop_fields
)
try:
data_list = []
for item in data:
@@ -541,7 +552,8 @@ async def post_mat_wc(
data=data,
source_system=source_system,
dedup_strategy=dedup_strategy,
update_mode=update_mode
update_mode=update_mode,
drop=drop
)
return map_staging_response_to_direct(staging_response)
@@ -655,7 +667,8 @@ async def post_mat_wc_bom(
data=data,
source_system=source_system,
dedup_strategy=dedup_strategy,
update_mode=update_mode
update_mode=update_mode,
drop=drop
)
return map_staging_response_to_direct(staging_response)
@@ -769,7 +782,8 @@ async def post_mat_wc_mold(
data=data,
source_system=source_system,
dedup_strategy=dedup_strategy,
update_mode=update_mode
update_mode=update_mode,
drop=drop
)
return map_staging_response_to_direct(staging_response)
+70 -1
View File
@@ -253,4 +253,73 @@ async def drop_matched_data(data: List[Any], db_names: str, table_name: str, mat
await db_delete(db_names=db_names, model_or_tablename=table_name, filter_string=filter_string)
except Exception as e:
logger.error(f"删除数据失败: {str(e)}")
raise e
raise e
async def mark_as_removing(
model_class,
table_name: str,
drop: Literal["all", "matched"],
data_list: List[Dict] = None,
drop_fields: List[str] = None
) -> int:
"""
将缓冲表数据标记为removing状态用于staging模式drop功能
Args:
model_class: Tortoise ORM模型类
table_name: 表名
drop: 标记方式"all""matched"
data_list: 新数据列表drop="matched"时需要
drop_fields: 匹配字段列表drop="matched"时需要
Returns:
标记的记录数
"""
from globalobjects import logger as log_config
from apps.data_opt.mds._base import StagingStatus
logger = log_config.get_logger(__name__)
if drop == "all":
count = await model_class.all().update(_status=StagingStatus.REMOVING)
logger.info(f"drop=all: 已将 {table_name} 全部 {count} 条记录标记为removing")
return count
elif drop == "matched":
if not data_list or not drop_fields:
logger.warning(f"drop=matched: 缺少data_list或drop_fields,跳过")
return 0
unique_combinations = set()
for item in data_list:
field_values = []
for field in drop_fields:
value = item.get(field)
if value is not None and value != '':
field_values.append(value)
else:
break
if len(field_values) == len(drop_fields):
unique_combinations.add(tuple(field_values))
if not unique_combinations:
logger.info(f"drop=matched: 无有效匹配值,跳过")
return 0
total_marked = 0
batch_size = 100
combinations_list = list(unique_combinations)
for i in range(0, len(combinations_list), batch_size):
batch = combinations_list[i:i+batch_size]
for values in batch:
condition = {f: v for f, v in zip(drop_fields, values)}
count = await model_class.filter(**condition).update(_status=StagingStatus.REMOVING)
total_marked += count
logger.info(f"drop=matched: 已将 {table_name}{total_marked} 条记录标记为removing (匹配{len(unique_combinations)}组)")
return total_marked
return 0