继续优化物料缓冲表

This commit is contained in:
2026-05-13 17:16:49 +08:00
parent 53f1cadc9f
commit 5524762604
22 changed files with 1143 additions and 202 deletions
+1
View File
@@ -16,6 +16,7 @@ dist/
# IDE directories and files
.vscode/
.idea/
.arts/
*.suo
*.ntvs*
*.njsproj
+206 -102
View File
@@ -93,18 +93,53 @@ def fill_defaults(table_name: str, data: Dict[str, Any]) -> Dict[str, Any]:
填充后的数据字典
"""
defaults = SCHEMA_DEFAULTS.get(table_name, {})
if not defaults:
return data
result = data.copy()
for field_name, default_value in defaults.items():
if default_value is None:
continue
if result.get(field_name) in NONE_AND_EMPTY:
if isinstance(default_value, datetime):
default_value = default_value.replace(tzinfo=timezone.utc)
result[field_name] = default_value
logger.debug(f"填充默认值: {table_name}.{field_name} = {default_value}")
# 根据字段类型获取默认值
schema_map = {
"t_material": AcceptMaterial,
"t_workcenter": AcceptWorkcenter,
"t_mat_ver": AcceptMatVer,
"t_mat_wc": AcceptMatWc,
"t_mat_wc_bom": AcceptMatWcBom,
"t_mold": AcceptMold,
"t_mat_wc_mold": AcceptMatWcMold,
}
schema_class = schema_map.get(table_name)
if not schema_class:
return data
# 遍历所有字段,填充默认值
for field_name, field_info in schema_class.model_fields.items():
current_value = result.get(field_name)
# 如果当前值是 None 或空字符串
if current_value in NONE_AND_EMPTY:
# 优先使用 SCHEMA_DEFAULTS 中的默认值
if field_name in defaults and defaults[field_name] is not None:
default_value = defaults[field_name]
if isinstance(default_value, datetime):
default_value = default_value.replace(tzinfo=timezone.utc)
result[field_name] = default_value
logger.debug(f"填充默认值: {table_name}.{field_name} = {default_value}")
else:
# 根据 Field 定义获取默认值
field_default = field_info.default
# 如果 Field 默认值不是 None 且不是 PydanticUndefined
if field_default is not None and str(field_default) != 'PydanticUndefined':
result[field_name] = field_default
logger.debug(f"填充字段默认值: {table_name}.{field_name} = {field_default}")
# 对于可选的字符串字段,填充空字符串
elif field_name in ['size', 'planitem', 'memo', 'free1', 'free2', 'free3']:
result[field_name] = ""
logger.debug(f"填充空字符串: {table_name}.{field_name} = ''")
# 对于可选的数值字段,填充 0
elif field_name in ['lotmin', 'lotmax']:
result[field_name] = 0.0
logger.debug(f"填充零值: {table_name}.{field_name} = 0.0")
return result
@@ -504,15 +539,18 @@ class DataCleaner:
"""保存错误记录"""
try:
for err in errors:
await ValidationError.create(
staging_table=staging_table,
staging_id=err.get("staging_id"),
error_type=err["error_type"],
error_field=err["error_field"],
error_value=err.get("error_value"),
error_message=err["error_message"],
suggestion=self._get_suggestion(err["error_type"])
)
try:
await ValidationError.create(
staging_table=staging_table,
staging_id=err.get("staging_id"),
error_type=err["error_type"],
error_field=err["error_field"],
error_value=err.get("error_value"),
error_message=err["error_message"],
suggestion=self._get_suggestion(err["error_type"])
)
except Exception as e:
logger.warning(f"保存单条错误记录失败(已忽略): {str(e)}")
except Exception as e:
import traceback
logger.error(f"保存错误记录失败: {str(e)}")
@@ -721,110 +759,176 @@ class StagingProcessor:
return stats
async def sync_to_production(self, table_name: str, batch_size: int = 100,
max_retries: int = 3, use_transaction: bool = True) -> Dict[str, int]:
"""同步到正式表(使用原生SQL"""
max_retries: int = 3, use_transaction: bool = True,
mode: str = "incremental", target_db: str = None,
update_status: bool = True) -> Dict[str, int]:
"""同步到正式表(复用自有API的db_bupsert
Args:
table_name: 表名
batch_size: 每批同步数量
max_retries: 最大重试次数
use_transaction: 是否使用事务
mode: 同步模式
- incremental: 仅同步校验通过的记录
- refresh: 清空正式表后同步全部记录
target_db: 目标账套名,为空则使用 MYAPS_MAIN_DB
update_status: 是否更新缓冲表状态为synced(刷新模式多账套时可能需要设为False)
"""
from tortoise import Tortoise
from tortoise.transactions import in_transaction
from core.settings import THIS_DB_NAME, MYAPS_MAIN_DB
from apps.io_api.utils.db_operation import db_bupsert
from apps.io_api.schemas import (
AcceptMaterial, AcceptWorkcenter, AcceptMatVer,
AcceptMatWc, AcceptMatWcBom, AcceptMold, AcceptMatWcMold
)
staging_model = STAGING_MODEL_MAPPING.get(table_name)
target_model = self.TARGET_MODELS.get(table_name)
if not staging_model or not target_model:
raise ValueError(f"未知的表: {table_name}")
stats = {"synced": 0, "failed": 0, "skipped": 0}
validated_records = await staging_model.filter(
_status=StagingStatus.VALIDATED
).filter(
_retry_count__lt=max_retries
).limit(batch_size)
if not validated_records:
return stats
target_db_name = target_db if target_db else MYAPS_MAIN_DB
stats = {"synced": 0, "failed": 0, "skipped": 0, "target_db": target_db_name}
pg_conn = Tortoise.get_connection(THIS_DB_NAME)
mysql_conn = Tortoise.get_connection(MYAPS_MAIN_DB)
staging_table_name = staging_model._meta.db_table
target_table_name = target_model._meta.db_table
if mode == "refresh":
mysql_conn = Tortoise.get_connection(target_db_name)
truncate_query = f'TRUNCATE TABLE `{target_table_name}`'
await mysql_conn.execute_query(truncate_query)
logger.info(f"已清空正式表: {target_table_name} (账套: {target_db_name})")
query = f'SELECT * FROM "{staging_table_name}" WHERE "_status" = $1 AND ("_retry_count" IS NULL OR "_retry_count" < $2) LIMIT $3'
result = await pg_conn.execute_query(query, ("validated", max_retries, batch_size))
records_to_sync = result[1] if result[1] else []
# 检查retry_count分布
retry_check = await pg_conn.execute_query(
f'SELECT "_retry_count", COUNT(*) as cnt FROM "{staging_table_name}" WHERE "_status" = $1 GROUP BY "_retry_count"',
("validated",)
)
retry_dist = {row["_retry_count"]: row["cnt"] for row in retry_check[1]} if retry_check[1] else {}
logger.info(f"同步查询: 表={staging_table_name}, 状态=validated, 重试<{max_retries}, 批次={batch_size}, 找到{len(records_to_sync)}条记录, retry分布={retry_dist}")
if not records_to_sync:
return stats
pk_fields = []
for field_name, field in target_model._meta.fields_map.items():
if field.pk:
pk_fields.append(field_name)
# Schema映射
schema_map = {
"t_material": AcceptMaterial,
"t_workcenter": AcceptWorkcenter,
"t_mat_ver": AcceptMatVer,
"t_mat_wc": AcceptMatWc,
"t_mat_wc_bom": AcceptMatWcBom,
"t_mold": AcceptMold,
"t_mat_wc_mold": AcceptMatWcMold,
}
schema_class = schema_map.get(table_name)
if not schema_class:
stats["skipped"] = len(records_to_sync)
return stats
staging_field_map = {}
target_field_map = {}
for field in staging_model._meta.fields_map.values():
db_col = field.source_field if field.source_field else field.model_field_name
staging_field_map[field.model_field_name] = db_col
for field in target_model._meta.fields_map.values():
db_col = field.source_field if field.source_field else field.model_field_name
target_field_map[field.model_field_name] = db_col
staging_table_name = staging_model._meta.table
target_table_name = target_model._meta.table
for record in validated_records:
try:
staging_data = self._record_to_dict(record, exclude_staging_fields=True)
target_data = {}
for staging_field, value in staging_data.items():
if staging_field not in target_field_map:
continue
target_col = target_field_map.get(staging_field, staging_field)
target_data[target_col] = value
pk_conditions = []
pk_values = []
for pk_field in pk_fields:
pk_col = staging_field_map.get(pk_field, pk_field)
if pk_col in target_data:
pk_conditions.append(f"`{pk_col}` = %s")
pk_values.append(target_data[pk_col])
if not pk_conditions:
stats["skipped"] += 1
data_list = []
staging_ids = []
for raw_record in records_to_sync:
record_dict = dict(raw_record)
staging_id = record_dict.get("_staging_id")
data = {}
for python_field, db_field in staging_field_map.items():
if python_field.startswith('_'):
continue
check_query = f"SELECT COUNT(*) as cnt FROM `{target_table_name}` WHERE {' AND '.join(pk_conditions)}"
result = await mysql_conn.execute_query(check_query, tuple(pk_values))
exists = result[1][0]['cnt'] > 0 if result[1] else False
if exists:
set_parts = []
values = []
for col, val in target_data.items():
if col not in [staging_field_map.get(pk, pk) for pk in pk_fields]:
set_parts.append(f"`{col}` = %s")
values.append(val)
values.extend(pk_values)
sync_query = f"UPDATE `{target_table_name}` SET {', '.join(set_parts)} WHERE {' AND '.join(pk_conditions)}"
else:
columns = [f"`{col}`" for col in target_data.keys()]
placeholders = ", ".join(["%s"] * len(target_data))
sync_query = f"INSERT INTO `{target_table_name}` ({', '.join(columns)}) VALUES ({placeholders})"
values = list(target_data.values())
await mysql_conn.execute_query(sync_query, tuple(values))
update_query = f'UPDATE "{staging_table_name}" SET "_status" = $1, "_synced_time" = $2 WHERE "_staging_id" = $3'
await pg_conn.execute_query(update_query, ("synced", datetime.now(timezone.utc), record._staging_id))
stats["synced"] += 1
value = record_dict.get(db_field)
if isinstance(value, datetime):
if value.tzinfo is None:
value = value.replace(tzinfo=timezone.utc)
data[python_field] = value
# 填充默认值(关键步骤)
data = fill_defaults(table_name, data)
try:
schema_obj = schema_class(**data)
data_list.append(schema_obj)
staging_ids.append(staging_id)
except Exception as e:
record._retry_count += 1
record._error_msg = str(e)
stats["failed"] += 1
logger.error(f"同步失败 [{table_name}] _staging_id={record._staging_id}, retry={record._retry_count}: {str(e)}")
logger.error(f"Schema转换失败 [{table_name}] _staging_id={staging_id}: {str(e)}")
retry_count = (record_dict.get("_retry_count") or 0) + 1
# Schema转换失败直接标记为rejected,不再重试
error_json = json.dumps([{
"staging_id": staging_id,
"error_type": "schema_error",
"error_field": None,
"error_value": None,
"error_message": f"Schema转换失败: {str(e)}"
}], ensure_ascii=False)
if record._retry_count >= max_retries:
record._status = StagingStatus.REJECTED
logger.warning(f"记录达到最大重试次数,已标记为拒绝: _staging_id={record._staging_id}")
try:
update_query = f'UPDATE "{staging_table_name}" SET "_retry_count" = $1, "_error_msg" = $2, "_status" = $3 WHERE "_staging_id" = $4'
await pg_conn.execute_query(update_query, (retry_count, error_json, "rejected", staging_id))
except Exception as update_err:
logger.error(f"更新失败记录状态时出错: {update_err}")
await record.save()
stats["failed"] = (stats.get("failed") or 0) + 1
if not data_list:
logger.warning(f"同步跳过 [{table_name}]: data_list为空,无有效数据可同步")
return stats
logger.info(f"准备同步 [{table_name}] 账套={target_db_name}: {len(data_list)}条数据, staging_ids={staging_ids[:5]}...")
try:
result = await db_bupsert(
db_names=target_db_name,
model_or_tablename=table_name,
data_list=data_list,
use_orm_or_sql="sql"
)
logger.info(f"db_bupsert结果 [{table_name}]: success={result.success}, message={result.message}, meta={result.meta}")
synced_count = result.affected_rows or 0
stats["synced"] = synced_count
# 只有在 update_status=True 时才更新缓冲表状态
if update_status and synced_count > 0:
synced_time = datetime.now(timezone.utc).replace(tzinfo=None)
for staging_id in staging_ids[:synced_count]:
update_query = f'UPDATE "{staging_table_name}" SET "_status" = $1, "_synced_time" = $2 WHERE "_staging_id" = $3'
await pg_conn.execute_query(update_query, ("synced", synced_time, staging_id))
if result.has_errors:
logger.warning(f"同步部分失败 [{table_name}] 账套={target_db_name}: {result.message}")
stats["failed"] += len(data_list) - (synced_count or 0)
except Exception as e:
import traceback
logger.error(f"同步失败 [{table_name}] 账套={target_db_name}: {str(e)}")
logger.error(traceback.format_exc())
stats["failed"] = len(data_list)
for staging_id in staging_ids:
retry_count = 1
error_json = json.dumps([{
"staging_id": staging_id,
"error_type": "sync_error",
"error_field": None,
"error_value": None,
"error_message": f"同步失败: {str(e)}"
}], ensure_ascii=False)
update_query = f'UPDATE "{staging_table_name}" SET "_retry_count" = $1, "_error_msg" = $2 WHERE "_staging_id" = $3'
await pg_conn.execute_query(update_query, (retry_count, error_json, staging_id))
return stats
+2 -2
View File
@@ -115,8 +115,8 @@ class TransformRule(TortoiseBaseModel):
is_active = fields.BooleanField(default=True, description="是否启用")
priority = fields.IntField(default=0, description="优先级")
description = fields.TextField(null=True, description="规则描述")
createtime = fields.DatetimeField(auto_now_add=True)
updatetime = fields.DatetimeField(auto_now=True)
createtime = fields.DatetimeField(default=lambda: datetime.now(timezone.utc), description="创建时间")
updatetime = fields.DatetimeField(default=lambda: datetime.now(timezone.utc), description="更新时间")
class Meta:
table = "t_transform_rule"
+268 -26
View File
@@ -15,7 +15,7 @@ from apps.data_opt.staging_models import (
from apps.data_opt.staging_cleaner import StagingProcessor, DataTransformer
from apps.io_api.utils.common import standard_response
from apps.io_api.utils.db_operation import db_bupsert
from core.settings import MYAPS_MAIN_DB, THIS_DB_NAME
from core.settings import MYAPS_MAIN_DB, THIS_DB_NAME, MYAPS_DBSET_LIST
from globalobjects import logger as log_config
logger = log_config.get_logger(__name__)
@@ -38,19 +38,20 @@ async def insert_to_staging_table(
exclude_fields: List[str] = None
) -> int:
"""
通用缓冲表SQL插入函数
通用缓冲表UPSERT函数INSERT ON CONFLICT UPDATE
Args:
model_class: Tortoise ORM 模型类
table_name: 目标表名
table_name: 目标表名 t_material_staging
data_list: 数据列表字段名使用小写格式如materialno
source_system: 来源系统
exclude_fields: 排除的字段列表 datetime 字段
Returns:
插入记录数
插入/更新记录数
"""
from tortoise import Tortoise
from apps.data_opt.staging_cleaner import BUSINESS_KEYS
if exclude_fields is None:
exclude_fields = ['_createtime', '_updatetime', 'sys_date', 'sys_stamp', 'sys_date']
@@ -59,9 +60,15 @@ async def insert_to_staging_table(
# 获取字段映射:Python字段名(小写) -> 数据库字段名(大驼峰)
field_map = {}
field_types = {}
for field in model_class._meta.fields_map.values():
db_col_name = field.source_field if field.source_field else field.model_field_name
field_map[field.model_field_name] = db_col_name
field_types[field.model_field_name] = type(field).__name__
# 获取主键字段
base_table_name = table_name.replace('_staging', '')
pk_fields = BUSINESS_KEYS.get(base_table_name, [])
count = 0
for item in data_list:
@@ -70,17 +77,66 @@ async def insert_to_staging_table(
for key, value in item.items():
if value is not None and key not in exclude_fields:
# key是传入的小写字段名,通过field_map映射到数据库字段名
db_column = field_map.get(key, key)
columns.append(db_column)
# 类型转换
field_type = field_types.get(key, '')
if field_type == 'IntField':
try:
value = int(value)
except (ValueError, TypeError):
pass
elif field_type == 'FloatField':
try:
value = float(value)
except (ValueError, TypeError):
pass
elif field_type == 'DecimalField':
try:
from decimal import Decimal
value = Decimal(str(value))
except:
pass
values.append(value)
placeholders = ", ".join(["$" + str(i+1) for i in range(len(values))])
column_list = ", ".join([f'"{col}"' for col in columns])
query = f'INSERT INTO "{table_name}" ({column_list}) VALUES ({placeholders})'
if pk_fields:
# 构建 ON CONFLICT 子句
pk_columns = [field_map.get(pk, pk) for pk in pk_fields]
conflict_target = ", ".join([f'"{col}"' for col in pk_columns])
# 构建 UPDATE SET 子句(排除主键字段)
update_parts = []
update_values = []
param_offset = len(values) + 1
for i, col in enumerate(columns):
if col in pk_columns:
continue
if col in ['_createtime']:
continue
update_parts.append(f'"{col}" = ${param_offset}')
update_values.append(values[i])
param_offset += 1
# 添加 _updatetime
update_parts.append('"_updatetime" = NOW()')
update_clause = ", ".join(update_parts)
all_values = values + update_values
query = f'''
INSERT INTO "{table_name}" ({column_list}) VALUES ({placeholders})
ON CONFLICT ({conflict_target}) DO UPDATE SET {update_clause}
'''
else:
query = f'INSERT INTO "{table_name}" ({column_list}) VALUES ({placeholders})'
all_values = values
await conn.execute_query(query, tuple(values))
await conn.execute_query(query, tuple(all_values))
count += 1
return count
@@ -125,7 +181,7 @@ async def staging_material(
request: Request,
data: List[Dict] = Body(..., description="物料数据列表"),
source_system: str = Query("unknown", description="来源系统"),
db_name: str = Query(MYAPS_MAIN_DB, description="账套")
# db_name: str = Query(MYAPS_MAIN_DB, description="账套") # 未使用,已注释
):
"""接收外部系统的物料数据,写入缓冲表"""
try:
@@ -149,7 +205,7 @@ async def staging_workcenter(
request: Request,
data: List[Dict] = Body(..., description="工作中心数据列表"),
source_system: str = Query("unknown", description="来源系统"),
db_name: str = Query(MYAPS_MAIN_DB, description="账套")
# db_name: str = Query(MYAPS_MAIN_DB, description="账套") # 未使用,已注释
):
"""接收外部系统的工作中心数据"""
try:
@@ -173,7 +229,7 @@ async def staging_mat_ver(
request: Request,
data: List[Dict] = Body(..., description="产线版本数据列表"),
source_system: str = Query("unknown", description="来源系统"),
db_name: str = Query(MYAPS_MAIN_DB, description="账套")
# db_name: str = Query(MYAPS_MAIN_DB, description="账套") # 未使用,已注释
):
"""接收外部系统的产线版本数据"""
try:
@@ -197,7 +253,7 @@ async def staging_mat_wc(
request: Request,
data: List[Dict] = Body(..., description="工艺路线数据列表"),
source_system: str = Query("unknown", description="来源系统"),
db_name: str = Query(MYAPS_MAIN_DB, description="账套")
# db_name: str = Query(MYAPS_MAIN_DB, description="账套") # 未使用,已注释
):
"""接收外部系统的工艺路线数据"""
try:
@@ -221,7 +277,7 @@ async def staging_mat_wc_bom(
request: Request,
data: List[Dict] = Body(..., description="BOM数据列表"),
source_system: str = Query("unknown", description="来源系统"),
db_name: str = Query(MYAPS_MAIN_DB, description="账套")
# db_name: str = Query(MYAPS_MAIN_DB, description="账套") # 未使用,已注释
):
"""接收外部系统的BOM数据"""
try:
@@ -245,7 +301,7 @@ async def staging_mold(
request: Request,
data: List[Dict] = Body(..., description="模具数据列表"),
source_system: str = Query("unknown", description="来源系统"),
db_name: str = Query(MYAPS_MAIN_DB, description="账套")
# db_name: str = Query(MYAPS_MAIN_DB, description="账套") # 未使用,已注释
):
"""接收外部系统的模具数据"""
try:
@@ -269,7 +325,7 @@ async def staging_mat_wc_mold(
request: Request,
data: List[Dict] = Body(..., description="机台模具关联数据列表"),
source_system: str = Query("unknown", description="来源系统"),
db_name: str = Query(MYAPS_MAIN_DB, description="账套")
# db_name: str = Query(MYAPS_MAIN_DB, description="账套") # 未使用,已注释
):
"""接收外部系统的机台模具关联数据"""
try:
@@ -351,17 +407,130 @@ async def sync_to_production(
table_name: str,
batch_size: int = Query(100, description="每批同步数量"),
max_retries: int = Query(3, description="最大重试次数"),
mode: str = Query("incremental", description="同步模式: incremental-增量, refresh-刷新"),
target_dbs: str = Query(None, description="目标账套列表(逗号分隔)"),
reset_retry: bool = Query(False, description="是否重置重试次数"),
db_name: str = Query(THIS_DB_NAME, description="账套")
):
"""校验通过的缓冲表数据同步到正式表"""
"""将缓冲表数据同步到正式表
Args:
mode:
- incremental: 仅同步校验通过的记录
- refresh: 清空正式表后同步全部记录
target_dbs: 目标账套列表多个用逗号分隔为空则同步到所有账套
reset_retry: 是否重置重试次数将retry_count设为0
"""
try:
from core.settings import MYAPS_DBSET_LIST, MYAPS_MAIN_DB
from tortoise import Tortoise
# 确定目标账套列表
if target_dbs:
target_db_list = [db.strip() for db in target_dbs.split(",") if db.strip()]
else:
target_db_list = MYAPS_DBSET_LIST
if not target_db_list:
raise ValueError("未配置目标账套")
# 重置重试次数
if reset_retry:
staging_model = STAGING_MODEL_MAPPING.get(table_name)
if staging_model:
conn = Tortoise.get_connection(THIS_DB_NAME)
staging_table_name = staging_model._meta.db_table
reset_query = f'UPDATE "{staging_table_name}" SET "_retry_count" = 0 WHERE "_status" = $1'
await conn.execute_query(reset_query, ("validated",))
logger.info(f"已重置重试次数: {staging_table_name}")
processor = StagingProcessor(db_name)
stats = await processor.sync_to_production(table_name, batch_size, max_retries)
# 多账套同步:先同步所有账套,最后统一更新状态
if len(target_db_list) > 1:
all_stats = {}
# 第一步:同步到所有账套(不更新状态)
for target_db in target_db_list:
stats = await processor.sync_to_production(
table_name=table_name,
batch_size=batch_size,
max_retries=max_retries,
mode=mode,
target_db=target_db,
update_status=False # 不更新状态
)
all_stats[target_db] = stats
# 第二步:统一更新缓冲表状态
# 只有全部成功时才标记为synced,否则标记为rejected
total_synced = sum(s.get("synced", 0) for s in all_stats.values())
total_failed = sum(s.get("failed", 0) for s in all_stats.values())
if total_synced > 0 or total_failed > 0:
from tortoise import Tortoise
from datetime import datetime, timezone
staging_model = STAGING_MODEL_MAPPING.get(table_name)
if staging_model:
conn = Tortoise.get_connection(THIS_DB_NAME)
staging_table_name = staging_model._meta.db_table
synced_time = datetime.now(timezone.utc).replace(tzinfo=None)
# 更新成功的记录为synced(只有validated状态且无错误的记录)
if total_synced > 0:
# 先查询需要更新的staging_id
success_ids_query = f'SELECT "_staging_id" FROM "{staging_table_name}" WHERE "_status" = $1 AND "_error_msg" IS NULL LIMIT $2'
success_ids_result = await conn.execute_query(success_ids_query, ("validated", total_synced))
success_ids = [row["_staging_id"] for row in success_ids_result[1]] if success_ids_result[1] else []
if success_ids:
update_query = f'UPDATE "{staging_table_name}" SET "_status" = $1, "_synced_time" = $2 WHERE "_staging_id" = ANY($3)'
await conn.execute_query(update_query, ("synced", synced_time, success_ids))
logger.info(f"已更新成功记录状态: {len(success_ids)}")
# 失败记录已在sync_to_production中标记为rejected,无需再次更新
logger.info(f"同步完成: {staging_table_name}, 成功{total_synced}条, 失败{total_failed}")
else:
# 单账套:同步后立即更新状态
all_stats = {}
for target_db in target_db_list:
stats = await processor.sync_to_production(
table_name=table_name,
batch_size=batch_size,
max_retries=max_retries,
mode=mode,
target_db=target_db,
update_status=True # 立即更新状态
)
all_stats[target_db] = stats
# 汇总统计
total_synced = sum(s.get("synced") or 0 for s in all_stats.values())
total_failed = sum(s.get("failed") or 0 for s in all_stats.values())
total_skipped = sum(s.get("skipped") or 0 for s in all_stats.values())
# 将details转为数组格式
details_list = [
{
"target_db": db_name,
"synced": stats.get("synced") or 0,
"failed": stats.get("failed") or 0,
"skipped": stats.get("skipped") or 0
}
for db_name, stats in all_stats.items()
]
return standard_response(
success=1,
message=f"同步完成",
data=stats
message=f"同步完成: {len(target_db_list)}个账套, 成功{total_synced}条, 失败{total_failed}",
data={
"target_dbs": target_db_list,
"total_synced": total_synced,
"total_failed": total_failed,
"total_skipped": total_skipped,
"details": details_list
}
)
except Exception as e:
logger.error(f"同步失败 [{table_name}]: {str(e)}")
@@ -444,6 +613,16 @@ async def get_validation_errors(
return standard_response(success=0, message=str(e))
@rt.get("/dblist", summary="获取账套列表")
async def get_db_list():
"""获取可用的账套列表"""
return standard_response(
success=1,
message="查询成功",
data=MYAPS_DBSET_LIST
)
@rt.get("/status/{table_name}", summary="获取缓冲表状态统计")
async def get_staging_status(
request: Request,
@@ -451,10 +630,18 @@ async def get_staging_status(
):
"""获取指定缓冲表的状态统计"""
try:
import sys
from tortoise import Tortoise
from core.settings import THIS_DB_NAME
staging_model = STAGING_MODEL_MAPPING.get(table_name)
if not staging_model:
raise ValueError(f"未知的缓冲表: {table_name}")
# 使用原生SQL查询,确保与同步查询条件一致
conn = Tortoise.get_connection(THIS_DB_NAME)
table_name_staging = staging_model._meta.db_table
stats = {}
for status in StagingStatus:
count = await staging_model.filter(_status=status).count()
@@ -462,14 +649,25 @@ async def get_staging_status(
stats["total"] = sum(stats.values())
# 额外统计:retry_count >= 3 的记录数
retry_exceeded_result = await conn.execute_query(
f'SELECT COUNT(*) as cnt FROM "{table_name_staging}" WHERE "_retry_count" >= $1',
(3,)
)
retry_exceeded = retry_exceeded_result[1][0]["cnt"] if retry_exceeded_result[1] else 0
stats["retry_exceeded"] = retry_exceeded
return standard_response(
success=1,
message="查询成功",
data=stats
)
except Exception as e:
logger.error(f"查询状态统计失败: {str(e)}")
return standard_response(success=0, message=str(e))
import traceback
error_detail = f"{type(e).__name__}: {str(e)}" if str(e) else type(e).__name__
logger.error(f"查询状态统计失败: {error_detail}")
logger.error(traceback.format_exc())
return standard_response(success=0, message=error_detail)
@rt.patch("/approve/{table_name}/{staging_id}", summary="审批缓冲表数据")
@@ -477,7 +675,7 @@ async def approve_staging(
request: Request,
table_name: str,
staging_id: int,
db_name: str = Query(MYAPS_MAIN_DB, description="账套")
# db_name: str = Query(MYAPS_MAIN_DB, description="账套") # 未使用,已注释
):
"""手动审批通过缓冲表记录"""
try:
@@ -501,7 +699,6 @@ async def reject_staging(
table_name: str,
staging_id: int,
reason: str = Query(..., description="拒绝原因"),
db_name: str = Query(MYAPS_MAIN_DB, description="账套")
):
"""手动拒绝缓冲表记录"""
try:
@@ -511,7 +708,14 @@ async def reject_staging(
record = await staging_model.get(_staging_id=staging_id)
record._status = StagingStatus.REJECTED
record._error_msg = reason
error_json = json.dumps([{
"staging_id": staging_id,
"error_type": "manual_reject",
"error_field": None,
"error_value": None,
"error_message": reason
}], ensure_ascii=False)
record._error_msg = error_json
await record.save()
return standard_response(success=1, message="已拒绝")
@@ -670,7 +874,7 @@ async def retry_failed_records(
request: Request,
table_name: str,
max_retry: int = Query(3, description="最大重试次数"),
db_name: str = Query(MYAPS_MAIN_DB, description="账套")
# db_name: str = Query(MYAPS_MAIN_DB, description="账套") # 未使用,已注释
):
"""重试同步失败的记录"""
try:
@@ -708,7 +912,7 @@ async def upload_excel(
file: UploadFile = File(..., description="Excel文件"),
source_system: str = Query("excel", description="来源系统"),
dedup_strategy: str = Query("skip", description="去重策略: overwrite/skip/reject"),
db_name: str = Query(MYAPS_MAIN_DB, description="账套")
# db_name: str = Query(MYAPS_MAIN_DB, description="账套") # 未使用,已注释
):
"""上传Excel文件并导入缓冲表,支持去重"""
try:
@@ -936,9 +1140,11 @@ async def batch_update_staging(
conn = Tortoise.get_connection(THIS_DB_NAME)
field_mapping = {}
field_types = {}
for field in staging_model._meta.fields_map.values():
db_col_name = field.source_field if field.source_field else field.model_field_name
field_mapping[field.model_field_name] = db_col_name
field_types[field.model_field_name] = type(field).__name__
set_clauses = []
params = []
@@ -949,6 +1155,23 @@ async def batch_update_staging(
if value is None:
set_clauses.append(f'"{db_field}" = NULL')
else:
field_type = field_types.get(python_field, '')
if field_type == 'IntField':
try:
value = int(value)
except (ValueError, TypeError):
pass
elif field_type == 'FloatField':
try:
value = float(value)
except (ValueError, TypeError):
pass
elif field_type == 'DecimalField':
try:
from decimal import Decimal
value = Decimal(str(value))
except:
pass
set_clauses.append(f'"{db_field}" = ${param_idx}')
params.append(value)
param_idx += 1
@@ -1019,7 +1242,7 @@ async def update_staging(
table_name: str,
staging_id: int,
data: Dict = Body(..., description="更新数据"),
db_name: str = Query(MYAPS_MAIN_DB, description="账套")
# db_name: str = Query(MYAPS_MAIN_DB, description="账套") # 未使用,已注释
):
"""更新单条缓冲表记录"""
try:
@@ -1033,9 +1256,11 @@ async def update_staging(
conn = Tortoise.get_connection(THIS_DB_NAME)
field_map = {}
field_types = {}
for field in staging_model._meta.fields_map.values():
db_col_name = field.source_field if field.source_field else field.model_field_name
field_map[field.model_field_name] = db_col_name
field_types[field.model_field_name] = type(field).__name__
set_parts = []
values = []
@@ -1048,6 +1273,23 @@ async def update_staging(
if value is None or value == '':
set_parts.append(f'"{db_col}" = NULL')
else:
field_type = field_types.get(key, '')
if field_type == 'IntField':
try:
value = int(value)
except (ValueError, TypeError):
pass
elif field_type == 'FloatField':
try:
value = float(value)
except (ValueError, TypeError):
pass
elif field_type == 'DecimalField':
try:
from decimal import Decimal
value = Decimal(str(value))
except:
pass
set_parts.append(f'"{db_col}" = ${param_idx}')
values.append(value)
param_idx += 1
+11 -12
View File
@@ -188,9 +188,9 @@ class ProtoDemand(TortoiseBaseModel):
apiex_sn = fields.CharField(source_field='ApiEx_SN', max_length=32, blank=True, null=True) # Field name made lowercase.
apiex_id = fields.CharField(source_field='ApiEx_ID', max_length=32, blank=True, null=True) # Field name made lowercase.
apiex_entryid = fields.CharField(source_field='ApiEx_EntryID', max_length=32, blank=True, null=True) # Field name made lowercase.
sys_date = fields.DatetimeField(source_field='Sys_Date', blank=True, null=True, auto_now_add=True) # Field name made lowercase.
sys_date = fields.DatetimeField(source_field='Sys_Date', blank=True, null=True) # Field name made lowercase.
sys_user = fields.CharField(source_field='Sys_User', max_length=32, blank=True, null=True) # Field name made lowercase.
sys_stamp = fields.DatetimeField(source_field='Sys_Stamp', auto_now=True) # Field name made lowercase.
sys_stamp = fields.DatetimeField(source_field='Sys_Stamp', blank=True, null=True) # Field name made lowercase.
class Meta:
abstract = True
@@ -379,7 +379,7 @@ class ProtoMatWc(TortoiseBaseModel):
offsetsec = fields.IntField(source_field='OffSetSec', blank=True, null=True) # Field name made lowercase.
rate = fields.FloatField(source_field='Rate', blank=True, null=True, description='配比') # Field name made lowercase.
memo = fields.CharField(source_field='Memo', max_length=255, blank=True, null=True) # Field name made lowercase.
sys_stamp = fields.DatetimeField(source_field='Sys_Stamp', auto_now=True) # Field name made lowercase.
sys_stamp = fields.DatetimeField(source_field='Sys_Stamp', blank=True, null=True) # Field name made lowercase.
class Meta:
abstract = True
@@ -400,7 +400,7 @@ class ProtoMatWcBom(TortoiseBaseModel):
scrap = fields.FloatField(source_field='Scrap', blank=True, null=True, description='%') # Field name made lowercase.
alt = fields.CharField(source_field='Alt', max_length=1, blank=True, null=True, description='Y/N是否是替代') # Field name made lowercase.
memo = fields.CharField(source_field='Memo', max_length=255, blank=True, null=True) # Field name made lowercase.
sys_stamp = fields.DatetimeField(source_field='Sys_Stamp', auto_now=True) # Field name made lowercase.
sys_stamp = fields.DatetimeField(source_field='Sys_Stamp', blank=True, null=True) # Field name made lowercase.
class Meta:
abstract = True
@@ -510,9 +510,8 @@ class ProtoMaterial(TortoiseBaseModel):
free3 = fields.CharField(source_field='Free3', max_length=255, blank=True, null=True) # Field name made lowercase.
memo = fields.CharField(source_field='Memo', max_length=255, blank=True, null=True) # Field name made lowercase.
sys_user = fields.CharField(source_field='Sys_User', max_length=32, blank=True, null=True) # Field name made lowercase.
sys_date = fields.DatetimeField(source_field='Sys_Date', auto_now_add=True) # Field name made lowercase.
sys_stamp = fields.DatetimeField(source_field='Sys_Stamp', auto_now=True) # Field name made lowercase.
sys_date = fields.DatetimeField(source_field='Sys_Date', blank=True, null=True) # Field name made lowercase.
sys_stamp = fields.DatetimeField(source_field='Sys_Stamp', blank=True, null=True)
class Meta:
abstract = True
table = 't_material'
@@ -1096,11 +1095,11 @@ class ProtoSupply(TortoiseBaseModel):
free3 = fields.CharField(source_field='Free3', max_length=255, blank=True, null=True) # Field name made lowercase.
apiex_sn = fields.CharField(source_field='ApiEx_SN', max_length=32, blank=True, null=True) # Field name made lowercase.
apiex_id = fields.CharField(source_field='ApiEx_ID', max_length=32, blank=True, null=True) # Field name made lowercase.
apiex_entryid = fields.CharField(source_field='ApiEx_EntryID', max_length=32, blank=True, null=True) # Field name made lowercase.
memo = fields.CharField(source_field='Memo', max_length=255, blank=True, null=True) # Field name made lowercase.
sys_date = fields.DatetimeField(source_field='Sys_Date', blank=True, null=True, auto_now_add=True) # Field name made lowercase.
sys_user = fields.CharField(source_field='Sys_User', max_length=32, blank=True, null=True) # Field name made lowercase.
sys_stamp = fields.DatetimeField(source_field='Sys_Stamp', auto_now=True) # Field name made lowercase.
apiex_entryid = fields.CharField(source_field='ApiEx_EntryID', max_length=32, blank=True, null=True)
memo = fields.CharField(source_field='Memo', max_length=255, blank=True, null=True)
sys_date = fields.DatetimeField(source_field='Sys_Date', blank=True, null=True)
sys_user = fields.CharField(source_field='Sys_User', max_length=32, blank=True, null=True)
sys_stamp = fields.DatetimeField(source_field='Sys_Stamp', blank=True, null=True)
class Meta:
abstract = True
+1 -1
View File
@@ -58,7 +58,7 @@ class AcceptMaterial(BaseModel):
abc: gc.AbcEnum = Field(..., example="A", description="ABC分类")
unit: str = Field(..., description='单位', example="PCS")
price: Decimal = Field(0, description="价格", ge=0, example=100.50)
groupno: str = Field(..., description="型号", example="G001")
groupno: str = Field("", description="型号", example="G001")
type: gc.EfEnum = Field(... if MYAPS_VERSION == 'P' else None, example="E", description="物料类型 E-自制件 F-采购件")
phantom: gc.YesNoEnum = Field(pdv.MAT_PHANTOM, example="N", description='虚拟件')
phantommin: int = Field(pdv.MAT_PHANTOMMIN, ge=0, description='虚拟时间(Minute)', example=0)
+14 -14
View File
@@ -887,10 +887,10 @@ async def db_bupsert(
use_orm_or_sql=use_orm_or_sql
)
batch_create = result.get("inserted", 0)
batch_update = result.get("updated", 0)
db_create_count += batch_create
db_update_count += batch_update
batch_create = result.get("inserted") or 0
batch_update = result.get("updated") or 0
db_create_count = (db_create_count or 0) + batch_create
db_update_count = (db_update_count or 0) + batch_update
logger.insert("账套批次生效", f"{db_name} (批次{i+1}/{total_batches})", f"新增{batch_create}条,修改{batch_update}")
except Exception as batch_error:
db_errors.append({"batch": i + 1, "error": str(batch_error), "skipped_count": len(batch_data)})
@@ -921,9 +921,9 @@ async def db_bupsert(
})
break
create_count_total += db_create_count
update_count_total += db_update_count
logger.insert("账套生效", db_name, f"新增{db_create_count}条,修改{db_update_count}")
create_count_total = (create_count_total or 0) + (db_create_count or 0)
update_count_total = (update_count_total or 0) + (db_update_count or 0)
logger.insert("账套生效", db_name, f"新增{db_create_count or 0}条,修改{db_update_count or 0}")
db_results.append({
"db_name": db_name,
"success": db_success,
@@ -957,16 +957,16 @@ async def db_bupsert(
data=data_list,
message=f"生效{len(db_results)}个账套,总计新增{create_count_total}条,修改{update_count_total}条,但存在错误",
meta={
"affected_rows": create_count_total + update_count_total,
"created_rows": create_count_total,
"updated_rows": update_count_total,
"affected_rows": (create_count_total or 0) + (update_count_total or 0),
"created_rows": create_count_total or 0,
"updated_rows": update_count_total or 0,
"origin_total": origin_total,
"distinct_total": len(processed_data_list),
"db_names": valid_dbs,
"table_name": table_name,
"has_errors": True,
"error_summary": error_summary,
"total_skipped_count": sum(r["skipped_count"] for r in db_results)
"total_skipped_count": sum(r.get("skipped_count") or 0 for r in db_results)
}
)
@@ -975,9 +975,9 @@ async def db_bupsert(
data=data_list,
message=f"生效{len(db_results)}个账套,总计新增{create_count_total}条,修改{update_count_total}",
meta={
"affected_rows": create_count_total + update_count_total,
"created_rows": create_count_total,
"updated_rows": update_count_total,
"affected_rows": (create_count_total or 0) + (update_count_total or 0),
"created_rows": create_count_total or 0,
"updated_rows": update_count_total or 0,
"origin_total": origin_total,
"distinct_total": len(processed_data_list),
"db_names": valid_dbs,
+351 -1
View File
@@ -1082,10 +1082,360 @@ async def batch_update_staging(request: Request, table_name: str, data: dict = B
| 2026-05-12 | v2.1 | 新增:校验阶段自动填充默认值、前端全选按钮功能 |
| 2026-05-12 | v2.2 | 修复:单条编辑空字符串处理、必填字段清空校验、校验循环处理、枚举校验完善、编辑后状态重置 |
| 2026-05-12 | v2.3 | 优化:校验错误详情展示、datetime时区修复、枚举标签样式、校验进度条动画 |
| 2026-05-12 | v2.4 | 优化:双击编辑、错误呼吸动画、同步模式选择、数据类型转换、UPSERT机制 |
| 2026-05-13 | v2.5 | 重大更新:多账套同步支持、前端内存泄漏修复、None值累加全面修复、错误信息格式统一、刷新同步逻辑优化 |
---
## 九、v2.3版本更新详情
## 九、v2.5版本更新详情
### 9.1 多账套同步支持
**新增功能**:支持将数据同步到多个账套(数据库)。
**前端实现**
- 同步模式对话框增加账套选择
- 默认全选所有账套,可取消勾选排除
**后端实现**
- `sync_to_production` 新增 `update_status` 参数
- 多账套同步策略:
- 先同步所有账套(`update_status=False`
- 最后统一更新缓冲表状态
**代码示例**
```python
# staging_routers.py
if len(target_db_list) > 1:
# 第一步:同步到所有账套(不更新状态)
for target_db in target_db_list:
stats = await processor.sync_to_production(
table_name=table_name,
target_db=target_db,
update_status=False # 不更新状态
)
# 第二步:统一更新缓冲表状态
update_query = f'UPDATE "{staging_table_name}" SET "_status" = $1 WHERE "_status" = $2'
await conn.execute_query(update_query, ("synced", "validated"))
```
### 9.2 前端内存泄漏修复
**问题1Blob URL未释放**
```javascript
// 修复前
link.href = URL.createObjectURL(blob);
link.click();
// Blob URL泄漏!
// 修复后
const blobUrl = URL.createObjectURL(blob);
link.href = blobUrl;
link.click();
setTimeout(() => URL.revokeObjectURL(blobUrl), 100);
```
**问题2Tooltip DOM未删除**
```javascript
// 修复前
cell.addEventListener('mouseleave', (e) => {
e.target._tooltip.style.display = 'none'; // 只隐藏
});
// 修复后
cell.addEventListener('mouseleave', (e) => {
e.target._tooltip.remove(); // 删除DOM
e.target._tooltip = null;
});
```
### 9.3 None值累加全面修复
**问题根源**Python的 `dict.get("key", 0)` 在值为 `None` 时返回 `None` 而非默认值。
**错误写法**
```python
value = dict.get("key", 0) + 1 # 如果值是None → TypeError!
```
**正确写法**
```python
value = (dict.get("key") or 0) + 1 # 先取值,再处理None
```
**修复位置**
| 文件 | 修复内容 |
|------|----------|
| `db_operation.py` | 批次累加、账套累加、affected_rows计算 |
| `db_manager.py` | `total_inserted``total_updated` |
| `staging_cleaner.py` | `stats["failed"]``retry_count` |
| `staging_routers.py` | `total_synced``total_failed` |
| `event_aggregator.py` | `low_queue_count` |
### 9.4 错误信息格式统一
**问题**`_error_msg` 字段格式不统一,前端解析失败。
**统一格式**
```json
[
{
"staging_id": 123,
"error_type": "schema_error",
"error_field": "groupno",
"error_value": null,
"error_message": "Input should be a valid string"
}
]
```
**前端容错**
```javascript
// data-table.js
parseErrorFields(row) {
try {
let errorData = row._error_msg;
if (typeof errorData === 'string') {
errorData = JSON.parse(errorData);
}
if (!Array.isArray(errorData)) {
errorData = [errorData];
}
// ...
} catch (e) {
errorMap['_error'] = { type: 'parse_error', message: '错误信息格式异常' };
}
}
```
### 9.5 刷新同步逻辑优化
**问题**:前端循环调用同步API,每次都执行TRUNCATE,导致数据被清空。
**修复方案**
| 模式 | 前端调用策略 |
|------|-------------|
| 增量模式 | 循环调用直到没有数据 |
| 刷新模式 | 只调用一次 |
**代码示例**
```javascript
// material.js
if (mode === 'refresh') {
// 刷新模式:一次性同步
const syncResponse = await callApi(`/sync/${TABLE_NAME}?mode=${mode}...`);
// 直接显示结果
} else {
// 增量模式:循环调用
while (true) {
const syncResponse = await callApi(`/sync/${TABLE_NAME}?mode=${mode}...`);
if (batchSynced === 0 && batchFailed === 0) break;
}
}
```
### 9.6 默认值填充增强
**问题**:Schema中部分字段默认值为 `None`,填充函数未处理。
**修复**`fill_defaults` 函数增强:
```python
def fill_defaults(table_name: str, data: Dict[str, Any]) -> Dict[str, Any]:
# 遍历所有字段
for field_name, field_info in schema_class.model_fields.items():
current_value = result.get(field_name)
if current_value in NONE_AND_EMPTY:
# 优先使用 SCHEMA_DEFAULTS
if field_name in defaults and defaults[field_name] is not None:
result[field_name] = defaults[field_name]
else:
# 从 Field.default 获取
field_default = field_info.default
if field_default is not None:
result[field_name] = field_default
# 特殊处理:可选字符串 → ""
elif field_name in ['size', 'planitem', 'memo']:
result[field_name] = ""
# 特殊处理:可选数值 → 0.0
elif field_name in ['lotmin', 'lotmax']:
result[field_name] = 0.0
```
---
## 十、v2.4版本更新详情
### 9.1 双击编辑触发
**修改内容**:单击改双击打开编辑弹窗。
```javascript
// data-table.js
tbody.querySelectorAll('.table-row').forEach(tr => {
tr.addEventListener('dblclick', (e) => { // click → dblclick
// 打开编辑弹窗
});
tr.style.cursor = 'pointer';
tr.title = '双击编辑';
});
```
### 9.2 错误字段呼吸动画
**动画参数**
| 元素 | 周期 | 效果 |
|------|------|------|
| 枚举错误标签 | 3s | 背景色 + 外发光 |
| 错误单元格 | 3s | 背景色 + 边框 + 外发光 |
| 校验失败行 | 4s | 背景色渐变 |
**CSS动画**
```css
@keyframes error-cell-breathe {
0%, 100% {
background-color: #ffe6e6;
border-color: #dc3545;
box-shadow: 0 0 0 rgba(220, 53, 69, 0);
}
50% {
background-color: #ffcccc;
border-color: #ff6666;
box-shadow: 0 0 6px rgba(220, 53, 69, 0.4);
}
}
@keyframes row-rejected-breathe {
0%, 100% { background-color: #fff8f8; }
50% { background-color: #fff0f0; }
}
```
### 9.3 同步模式选择
**两种模式**
| 模式 | 说明 | 数据范围 |
|------|------|----------|
| 增量同步 | 仅同步校验通过的新数据 | validated 状态 |
| 刷新同步 | 清空正式表后重新同步 | validated 状态 |
**前端对话框**
```javascript
function showSyncModeDialog(validatedCount) {
// 显示两种模式选项
// 增量同步:保留正式表现有数据
// 刷新同步:删除正式表所有数据
}
```
**后端实现**
```python
async def sync_to_production(..., mode: str = "incremental"):
if mode == "refresh":
# 清空正式表
await mysql_conn.execute_query(f'TRUNCATE TABLE `{target_table_name}`')
# 无论哪种模式,都只同步 validated 状态
query = f'SELECT * FROM "{staging_table_name}" WHERE "_status" = $1'
```
### 9.4 数据类型自动转换
**问题**:前端表单提交都是字符串,数据库需要正确类型。
**修复**:后端根据字段类型自动转换。
```python
field_types = {}
for field in model_class._meta.fields_map.values():
field_types[field.model_field_name] = type(field).__name__
for key, value in data.items():
field_type = field_types.get(key, '')
if field_type == 'IntField':
value = int(value)
elif field_type == 'FloatField':
value = float(value)
elif field_type == 'DecimalField':
value = Decimal(str(value))
```
**影响范围**
- POST接收数据(insert_to_staging_table
- 单条编辑(update_staging
- 批量编辑(batch_update_staging
### 9.5 UPSERT机制
**问题**:POST接收数据时,已存在记录会主键冲突报错。
**修复**:改为 INSERT ON CONFLICT UPDATE。
```python
query = f'''
INSERT INTO "{table_name}" ({column_list}) VALUES ({placeholders})
ON CONFLICT ({conflict_target}) DO UPDATE SET {update_clause}
'''
```
**各表主键**
| 表名 | 主键字段 |
|------|----------|
| t_material | materialno |
| t_workcenter | workcenter |
| t_mat_ver | materialno, matver |
| t_mat_wc | materialno, matver, itemno |
| t_mat_wc_bom | productno, matver, itemno, materialno |
| t_mold | moldno |
| t_mat_wc_mold | materialno, workcenter, itemno, moldno |
**行为变化**
| 场景 | 之前 | 现在 |
|------|------|------|
| 新记录 | INSERT,状态 pending | INSERT,状态 pending |
| 已存在记录 | ❌ 主键冲突 | UPDATE,状态 pending |
| 已校验记录被覆盖 | 不可能 | 状态重置为 pending |
**UPDATE字段**
- 所有业务字段
- `_source_system`
- `_status` → 'pending'
- `_updatetime` → NOW()
**不更新**
- `_staging_id`
- `_createtime`
### 9.6 修改文件清单
| 文件 | 修改内容 |
|------|----------|
| `staging_routers.py` | UPSERT、类型转换 |
| `staging_cleaner.py` | 同步模式参数、原生SQL |
| `data-table.js` | 双击编辑、enumFields |
| `material.js` | 同步模式对话框、进度条 |
| `custom.css` | 呼吸动画 |
---
## 十、v2.3版本更新详情
### 9.1 校验错误详情展示
+17 -14
View File
@@ -993,6 +993,7 @@ class DbManager:
# - 未改变:影响行数 = 0
# 使用实际处理的数据行数(len(batch))代替batch_size,因为可能有重复数据
actual_size = len(batch)
affected = affected or 0
updated = max(0, affected - actual_size)
inserted = affected - 2 * updated
# 确保插入数量为非负数
@@ -1001,11 +1002,13 @@ class DbManager:
# 对于 INSERT IGNORE:
# - 成功插入:影响行数 = 1
# - 忽略冲突:影响行数 = 0
inserted = affected
inserted = affected or 0
updated = 0
total_inserted += inserted
total_updated += updated
logger.info(f"批量upsert执行成功: 表={table_name}, 批次={i//batch_size + 1}, 影响行数={affected}, 插入={inserted}, 更新={updated}")
total_inserted = (total_inserted or 0) + inserted
total_updated = (total_updated or 0) + updated
self.stats['batches_executed'] += 1
@@ -1019,9 +1022,9 @@ class DbManager:
continue
return {
'inserted': total_inserted,
'updated': total_updated,
'total': total_inserted + total_updated
'inserted': total_inserted or 0,
'updated': total_updated or 0,
'total': (total_inserted or 0) + (total_updated or 0)
}
@@ -1142,9 +1145,9 @@ class DbManager:
inserted_count += 1
return {
'inserted': inserted_count,
'updated': updated_count,
'total': inserted_count + updated_count
'inserted': inserted_count or 0,
'updated': updated_count or 0,
'total': (inserted_count or 0) + (updated_count or 0)
}
@@ -1479,16 +1482,16 @@ class DbManager:
# 影响行数 = 新增行数 + 更新成功的行数
updated = max(0, affected - inserted)
total_inserted += inserted
total_updated += updated
total_inserted = (total_inserted or 0) + inserted
total_updated = (total_updated or 0) + updated
# 移除事务分支,直接执行批次处理
await execute_batch()
return {
'inserted': total_inserted,
'updated': total_updated,
'total': total_inserted + total_updated
'inserted': total_inserted or 0,
'updated': total_updated or 0,
'total': (total_inserted or 0) + (total_updated or 0)
}
+1 -1
View File
@@ -256,7 +256,7 @@ class EventThreadPoolManager:
# 缩容逻辑(需要连续多次检测队列都较低)
elif queue_size < self._scale_down_threshold:
stats['low_queue_count'] = stats.get('low_queue_count', 0) + 1
stats['low_queue_count'] = (stats.get('low_queue_count') or 0) + 1
if stats['low_queue_count'] >= 3 and current_workers > min_workers:
target_workers = max(current_workers - 1, min_workers)
logger.info(f"📉 线程池 {name} 队列空闲({queue_size}),缩容: {current_workers}{target_workers}")
+38 -1
View File
@@ -345,6 +345,18 @@ body {
background-color: #ffcccc;
color: #cc0000;
border: 1px solid #dc3545;
animation: enum-error-breathe 3s ease-in-out infinite;
}
@keyframes enum-error-breathe {
0%, 100% {
background-color: #ffcccc;
box-shadow: 0 0 0 rgba(220, 53, 69, 0);
}
50% {
background-color: #ffe6e6;
box-shadow: 0 0 4px rgba(220, 53, 69, 0.3);
}
}
/* 错误单元格高亮 */
@@ -355,6 +367,20 @@ body {
padding: 0.1rem 0.3rem;
cursor: help;
position: relative;
animation: error-cell-breathe 3s ease-in-out infinite;
}
@keyframes error-cell-breathe {
0%, 100% {
background-color: #ffe6e6;
border-color: #dc3545;
box-shadow: 0 0 0 rgba(220, 53, 69, 0);
}
50% {
background-color: #ffcccc;
border-color: #ff6666;
box-shadow: 0 0 6px rgba(220, 53, 69, 0.4);
}
}
.error-cell.null-cell {
@@ -370,10 +396,21 @@ body {
/* 校验失败行样式 */
.table-row-rejected {
background-color: #fff8f8 !important;
animation: row-rejected-breathe 4s ease-in-out infinite;
}
@keyframes row-rejected-breathe {
0%, 100% {
background-color: #fff8f8;
}
50% {
background-color: #fff0f0;
}
}
.table-row-rejected:hover {
background-color: #fff0f0 !important;
background-color: #ffe8e8 !important;
animation-play-state: paused;
}
/* 错误提示框 */
+2 -1
View File
@@ -4,7 +4,8 @@
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>数据清洗管理系统</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.2/dist/css/bootstrap.min.css" rel="stylesheet">
<link href="/static/mds/css/bootstrap.min.css" rel="stylesheet">
<script src="/static/mds/js/bootstrap.bundle.min.js"></script>
<style>
body {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
+4 -1
View File
@@ -248,8 +248,11 @@ function downloadTemplate(tableName) {
let csv = data.map(row => row.join(',')).join('\n');
const blob = new Blob(['\ufeff' + csv], { type: 'text/csv;charset=utf-8;' });
const blobUrl = URL.createObjectURL(blob);
const link = document.createElement('a');
link.href = URL.createObjectURL(blob);
link.href = blobUrl;
link.download = `${tableName}_template.csv`;
link.click();
setTimeout(() => URL.revokeObjectURL(blobUrl), 100);
}
+23 -6
View File
@@ -223,7 +223,7 @@ class DataTable {
});
tbody.querySelectorAll('.table-row').forEach(tr => {
tr.addEventListener('click', (e) => {
tr.addEventListener('dblclick', (e) => {
if (e.target.type === 'checkbox') return;
const id = parseInt(tr.dataset.id);
const rowData = this.data.find(r => r._staging_id === id);
@@ -231,6 +231,8 @@ class DataTable {
this.onRowClick(rowData);
}
});
tr.style.cursor = 'pointer';
tr.title = '双击编辑';
});
}
@@ -238,8 +240,14 @@ class DataTable {
const errorMap = {};
if (row._status === 'rejected' && row._error_msg) {
try {
const errors = JSON.parse(row._error_msg);
errors.forEach(err => {
let errorData = row._error_msg;
if (typeof errorData === 'string') {
errorData = JSON.parse(errorData);
}
if (!Array.isArray(errorData)) {
errorData = [errorData];
}
errorData.forEach(err => {
if (err.error_field) {
errorMap[err.error_field] = {
type: err.error_type,
@@ -248,7 +256,11 @@ class DataTable {
}
});
} catch (e) {
console.error('解析错误信息失败:', e);
console.error('解析错误信息失败:', e, '原始数据:', row._error_msg);
errorMap['_error'] = {
type: 'parse_error',
message: typeof row._error_msg === 'string' ? row._error_msg : '错误信息格式异常'
};
}
}
return errorMap;
@@ -306,6 +318,9 @@ class DataTable {
}
bindTooltip() {
const existingTooltips = document.querySelectorAll('.error-tooltip');
existingTooltips.forEach(t => t.remove());
document.querySelectorAll('.error-cell').forEach(cell => {
cell.addEventListener('mouseenter', (e) => {
const errorType = e.target.dataset.errorType;
@@ -325,7 +340,8 @@ class DataTable {
cell.addEventListener('mouseleave', (e) => {
if (e.target._tooltip) {
e.target._tooltip.style.display = 'none';
e.target._tooltip.remove();
e.target._tooltip = null;
}
});
});
@@ -360,7 +376,8 @@ class DataTable {
cell.addEventListener('mouseleave', (e) => {
if (e.target._tooltip) {
e.target._tooltip.style.display = 'none';
e.target._tooltip.remove();
e.target._tooltip = null;
}
});
});
+190 -10
View File
@@ -25,8 +25,8 @@ const TABLE_COLUMNS = [
{ field: 'type', title: '类型', width: '50px' },
{ field: 'phantom', title: '虚拟件', width: '60px' },
{ field: 'phantommin', title: '虚拟时间', width: '70px' },
{ field: 'firmday', title: '固定天', width: '60px' },
{ field: 'daygap', title: '拆分天', width: '60px' },
{ field: 'firmday', title: '固定天', width: '60px' },
{ field: 'daygap', title: '拆分天', width: '60px' },
{ field: 'candelay', title: '可延迟', width: '60px' },
{ field: 'lotsize', title: '批量策略', width: '70px' },
{ field: 'lotfix', title: '固定批', width: '60px' },
@@ -357,17 +357,197 @@ async function validateAllData() {
}
async function syncData() {
showLoading();
const response = await callApi(`/status/${TABLE_NAME}`);
if (response.success !== 1) {
showMessage('获取状态失败', 'danger');
return;
}
const response = await callApi(`/sync/${TABLE_NAME}`, 'POST');
const stats = response.data;
const validatedCount = stats.validated || 0;
const retryExceeded = stats.retry_exceeded || 0;
hideLoading();
if (validatedCount === 0) {
showMessage('没有校验通过的记录可同步', 'warning');
return;
}
handleResponse(response, (data) => {
const stats = data.data;
showMessage(`同步完成: 成功${stats.synced}条, 失败${stats.failed}`, 'success');
dataTable.refresh();
statusCard.refresh();
// 如果有超过重试次数的记录,询问是否重置
let resetRetry = false;
if (retryExceeded > 0) {
resetRetry = confirm(`${retryExceeded}条记录的重试次数已达上限,是否重置重试次数后同步?\n\n点击"确定"重置并同步,点击"取消"跳过这些记录`);
}
const { mode, targetDbs } = await showSyncModeDialog(validatedCount);
if (!mode || !targetDbs || targetDbs.length === 0) return;
const targetDbParam = targetDbs.join(',');
const totalCount = validatedCount * targetDbs.length;
showProgress(mode === 'incremental' ? '增量同步中' : '刷新同步中', totalCount);
let totalSynced = 0;
let totalFailed = 0;
let processed = 0;
// 构建API URL
const baseUrl = `/sync/${TABLE_NAME}?batch_size=200&mode=${mode}&target_dbs=${encodeURIComponent(targetDbParam)}&reset_retry=${resetRetry}`;
// 刷新模式只调用一次,增量模式循环调用
if (mode === 'refresh') {
// 刷新模式:一次性同步
setProgressIndeterminate(true);
const syncResponse = await callApi(baseUrl, 'POST');
setProgressIndeterminate(false);
if (syncResponse.success !== 1) {
hideProgress();
showMessage(syncResponse.message || '同步失败', 'danger');
} else {
const syncStats = syncResponse.data;
totalSynced = syncStats.total_synced || 0;
totalFailed = syncStats.total_failed || 0;
updateProgress(totalSynced + totalFailed, totalCount, `已处理 ${totalSynced + totalFailed}/${totalCount}`);
// 根据成功/失败显示不同消息
if (totalFailed > 0) {
showMessage(`同步完成: ${targetDbs.length}个账套, 成功${totalSynced}条, 失败${totalFailed}条(部分记录缺少必填字段)`, 'warning');
} else {
showMessage(`同步完成: ${targetDbs.length}个账套, 成功${totalSynced}`, 'success');
}
}
} else {
// 增量模式:循环调用直到没有数据
let firstCall = true;
while (true) {
setProgressIndeterminate(true);
const url = firstCall ? baseUrl : `/sync/${TABLE_NAME}?batch_size=200&mode=${mode}&target_dbs=${encodeURIComponent(targetDbParam)}`;
const syncResponse = await callApi(url, 'POST');
setProgressIndeterminate(false);
firstCall = false;
if (syncResponse.success !== 1) {
hideProgress();
showMessage(syncResponse.message || '同步失败', 'danger');
break;
}
const syncStats = syncResponse.data;
const batchSynced = syncStats.total_synced || 0;
const batchFailed = syncStats.total_failed || 0;
totalSynced += batchSynced;
totalFailed += batchFailed;
processed += batchSynced + batchFailed;
if (processed > 0) {
updateProgress(processed, totalCount, `已处理 ${processed}/${totalCount}`);
}
if (batchSynced === 0 && batchFailed === 0) {
break;
}
await sleep(50);
}
// 根据成功/失败显示不同消息
if (totalFailed > 0) {
showMessage(`同步完成: ${targetDbs.length}个账套, 成功${totalSynced}条, 失败${totalFailed}条(部分记录缺少必填字段)`, 'warning');
} else {
showMessage(`同步完成: ${targetDbs.length}个账套, 成功${totalSynced}`, 'success');
}
}
hideProgress();
dataTable.refresh();
statusCard.refresh();
}
async function showSyncModeDialog(validatedCount) {
// 获取账套列表
const dbListResponse = await callApi('/dblist');
const dbList = dbListResponse.success === 1 ? dbListResponse.data : [];
return new Promise((resolve) => {
const modalHtml = `
<div class="modal fade" id="syncModeModal" tabindex="-1">
<div class="modal-dialog modal-dialog-centered">
<div class="modal-content">
<div class="modal-header">
<h5 class="modal-title">选择同步模式</h5>
<button type="button" class="btn-close" data-bs-dismiss="modal"></button>
</div>
<div class="modal-body">
<div class="mb-3">
<label class="form-label fw-bold">目标账套</label>
<div class="border rounded p-2" style="max-height: 150px; overflow-y: auto;">
${dbList.map((db, idx) => `
<div class="form-check">
<input class="form-check-input target-db-checkbox" type="checkbox" id="targetDb_${idx}" value="${db}" checked>
<label class="form-check-label" for="targetDb_${idx}">${db}</label>
</div>
`).join('')}
</div>
<div class="form-text">默认全选可取消勾选排除不需要同步的账套</div>
</div>
<hr>
<div class="mb-3">
<label class="form-label fw-bold">同步模式</label>
<div class="form-check">
<input class="form-check-input" type="radio" name="syncMode" id="modeIncremental" value="incremental" checked>
<label class="form-check-label" for="modeIncremental">
<strong>增量同步</strong> <span class="badge bg-primary">${validatedCount}</span>
</label>
<div class="text-muted small mt-1">仅同步校验通过的新数据保留正式表现有数据</div>
</div>
</div>
<div class="mb-3">
<div class="form-check">
<input class="form-check-input" type="radio" name="syncMode" id="modeRefresh" value="refresh">
<label class="form-check-label" for="modeRefresh">
<strong>刷新同步</strong> <span class="badge bg-warning text-dark">${validatedCount}</span>
</label>
<div class="text-muted small mt-1">清空正式表后重新同步校验通过的数据</div>
</div>
</div>
<div class="alert alert-warning small mb-0">
<i class="bi bi-exclamation-triangle"></i> <strong></strong>
</div>
</div>
<div class="modal-footer">
<button type="button" class="btn btn-secondary" data-bs-dismiss="modal">取消</button>
<button type="button" class="btn btn-primary" id="confirmSyncBtn">开始同步</button>
</div>
</div>
</div>
</div>
`;
document.body.insertAdjacentHTML('beforeend', modalHtml);
const modal = new bootstrap.Modal(document.getElementById('syncModeModal'));
const confirmBtn = document.getElementById('confirmSyncBtn');
confirmBtn.addEventListener('click', () => {
const mode = document.querySelector('input[name="syncMode"]:checked').value;
const targetDbs = Array.from(document.querySelectorAll('.target-db-checkbox:checked')).map(cb => cb.value);
if (targetDbs.length === 0) {
showMessage('请至少选择一个目标账套', 'warning');
return;
}
modal.hide();
resolve({ mode, targetDbs });
});
document.getElementById('syncModeModal').addEventListener('hidden.bs.modal', function() {
this.remove();
resolve({ mode: null, targetDbs: [] });
}, { once: true });
modal.show();
});
}
+2 -1
View File
@@ -4,7 +4,8 @@
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>产线版本数据清洗管理</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.2/dist/css/bootstrap.min.css" rel="stylesheet">
<link href="/static/mds/css/bootstrap.min.css" rel="stylesheet">
<script src="/static/mds/js/bootstrap.bundle.min.js"></script>
</head>
<body>
<nav class="navbar navbar-expand-lg navbar-dark bg-primary mb-4">
+2 -1
View File
@@ -4,7 +4,8 @@
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>BOM数据清洗管理</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.2/dist/css/bootstrap.min.css" rel="stylesheet">
<link href="/static/mds/css/bootstrap.min.css" rel="stylesheet">
<script src="/static/mds/js/bootstrap.bundle.min.js"></script>
</head>
<body>
<nav class="navbar navbar-expand-lg navbar-dark bg-primary mb-4">
+2 -1
View File
@@ -4,7 +4,8 @@
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>机台模具关联数据清洗管理</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.2/dist/css/bootstrap.min.css" rel="stylesheet">
<link href="/static/mds/css/bootstrap.min.css" rel="stylesheet">
<script src="/static/mds/js/bootstrap.bundle.min.js"></script>
</head>
<body>
<nav class="navbar navbar-expand-lg navbar-dark bg-primary mb-4">
+2 -1
View File
@@ -4,7 +4,8 @@
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>工艺路线数据清洗管理</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.2/dist/css/bootstrap.min.css" rel="stylesheet">
<link href="/static/mds/css/bootstrap.min.css" rel="stylesheet">
<script src="/static/mds/js/bootstrap.bundle.min.js"></script>
</head>
<body>
<nav class="navbar navbar-expand-lg navbar-dark bg-primary mb-4">
+2 -4
View File
@@ -4,8 +4,7 @@
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>物料数据清洗管理</title>
<!-- Bootstrap CSS (CDN) -->
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.2/dist/css/bootstrap.min.css" rel="stylesheet">
<link href="/static/mds/css/bootstrap.min.css" rel="stylesheet">
<link rel="stylesheet" href="/static/mds/css/custom.css">
</head>
<body>
@@ -223,8 +222,7 @@
</div>
</div>
<!-- Bootstrap JS (CDN) -->
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.3.2/dist/js/bootstrap.bundle.min.js"></script>
<script src="/static/mds/js/bootstrap.bundle.min.js"></script>
<script src="/static/mds/js/common.js"></script>
<script src="/static/mds/js/data-table.js"></script>
<script src="/static/mds/js/status-card.js"></script>
+2 -1
View File
@@ -4,7 +4,8 @@
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>模具数据清洗管理</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.2/dist/css/bootstrap.min.css" rel="stylesheet">
<link href="/static/mds/css/bootstrap.min.css" rel="stylesheet">
<script src="/static/mds/js/bootstrap.bundle.min.js"></script>
</head>
<body>
<nav class="navbar navbar-expand-lg navbar-dark bg-primary mb-4">
+2 -1
View File
@@ -4,7 +4,8 @@
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>工作中心数据清洗管理</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.2/dist/css/bootstrap.min.css" rel="stylesheet">
<link href="/static/mds/css/bootstrap.min.css" rel="stylesheet">
<script src="/static/mds/js/bootstrap.bundle.min.js"></script>
</head>
<body>
<nav class="navbar navbar-expand-lg navbar-dark bg-primary mb-4">