mirror of
https://github.com/rnvm9wjdtj-bot/myaps_api.git
synced 2026-06-02 05:54:40 +00:00
fix海达
This commit is contained in:
@@ -13,6 +13,7 @@ import inspect
|
||||
from enum import Enum
|
||||
from typing import List, Dict, Optional, Literal, Callable, Union, Any, Type
|
||||
from collections import defaultdict
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
|
||||
from datetime import date, datetime, timedelta
|
||||
@@ -1760,24 +1761,49 @@ class ApsPayloadSponsor:
|
||||
|
||||
|
||||
@classmethod
|
||||
async def add_batchlog(cls, pidno: str, strategy: str, target: Literal['RECE', 'PROC', 'SUCC', 'FAIL'], memo: str=""):
|
||||
async def execute_batchlog(cls, sent_batchlog: Dict[str, Any], handlers: Dict[str, callable] = None):
|
||||
"""批次日志状态执行器
|
||||
|
||||
memo = {
|
||||
"RECE": "Received已接收",
|
||||
"PROC": "Processing处理中",
|
||||
"SUCC": "Success 成功",
|
||||
"FALI": f"Failed {memo}"
|
||||
}.get(target)
|
||||
用法:
|
||||
await ApsPayloadSponsor.execute_batchlog(sent_batchlog, STRATEGY_HANDLERS)
|
||||
|
||||
await TBatchLog.create(
|
||||
systime=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
||||
pidno=pidno,
|
||||
task="API",
|
||||
strategy=strategy,
|
||||
target=target,
|
||||
memo=memo
|
||||
)
|
||||
状态流转: RECE -> SUCC (正常) / FAIL (异常)
|
||||
"""
|
||||
handlers = handlers or {}
|
||||
|
||||
async def add_batchlog(pidno: str, strategy: str, target: Literal['RECE', 'PROC', 'SUCC', 'FAIL'], memo: str=""):
|
||||
|
||||
memo = {
|
||||
"RECE": "Received已接收",
|
||||
"PROC": "Processing处理中",
|
||||
"SUCC": "Success 成功",
|
||||
"FAIL": f"Failed {memo}"
|
||||
}.get(target, f"Unknown target: {target}")
|
||||
|
||||
await TBatchLog.create(
|
||||
systime=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
||||
pidno=pidno,
|
||||
task="API",
|
||||
strategy=strategy,
|
||||
target=target,
|
||||
memo=memo
|
||||
)
|
||||
|
||||
pidno = sent_batchlog['pidno']
|
||||
strategy = sent_batchlog['strategy']
|
||||
|
||||
await add_batchlog(pidno=pidno, strategy=strategy, target='RECE')
|
||||
try:
|
||||
handler = handlers.get(strategy)
|
||||
if handler:
|
||||
if inspect.iscoroutinefunction(handler):
|
||||
await handler()
|
||||
else:
|
||||
handler()
|
||||
await add_batchlog(pidno=pidno, strategy=strategy, target='SUCC')
|
||||
except Exception as e:
|
||||
await add_batchlog(pidno=pidno, strategy=strategy, target='FAIL', memo=str(e))
|
||||
raise
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -127,6 +127,19 @@ class DatabaseHandler(Handler):
|
||||
Args:
|
||||
records: 日志记录列表
|
||||
"""
|
||||
# 检查 Tortoise ORM 是否真正初始化完成
|
||||
try:
|
||||
from tortoise import Tortoise
|
||||
if not Tortoise._inited:
|
||||
# ORM 未就绪,将记录重新放入缓冲区,等待下次 flush
|
||||
with self._lock:
|
||||
for r in records:
|
||||
if len(self._buffer) < self._buffer_size:
|
||||
self._buffer.append(r)
|
||||
return
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
try:
|
||||
from apps.common.monitor.models import SystemLog
|
||||
from core.settings import SQLITE_FILE
|
||||
@@ -161,7 +174,15 @@ class DatabaseHandler(Handler):
|
||||
try:
|
||||
await SystemLog.bulk_create(logs)
|
||||
except Exception as e:
|
||||
raise e
|
||||
# 数据库写入失败,将记录重新放入缓冲区等待重试
|
||||
error_msg = str(e)
|
||||
if "not initialised" in error_msg.lower() or "configuration" in error_msg.lower():
|
||||
with self._lock:
|
||||
for r in records:
|
||||
if len(self._buffer) < self._buffer_size:
|
||||
self._buffer.append(r)
|
||||
else:
|
||||
raise
|
||||
|
||||
def get_stats(self) -> dict:
|
||||
"""获取统计信息"""
|
||||
|
||||
@@ -6,6 +6,7 @@ import pandas as pd
|
||||
from datetime import datetime
|
||||
from typing import List, Dict, Union
|
||||
|
||||
from contextlib import asynccontextmanager
|
||||
from fastapi import status
|
||||
from dateutil.relativedelta import relativedelta
|
||||
|
||||
@@ -323,15 +324,18 @@ async def batch_handle_pl_status_a2e(event_data_list: List[Dict], _erp: EventRes
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
|
||||
#################################################################################
|
||||
# ⬇️一键通排批次日志
|
||||
#################################################################################
|
||||
|
||||
# strategy -> handler function 映射表
|
||||
_STRATEGY_HANDLERS: Dict[str, callable] = {
|
||||
'库存': refresh_stock,
|
||||
# 添加更多策略处理器...
|
||||
# '采购': refresh_purchase,
|
||||
# '生产': refresh_production,
|
||||
}
|
||||
|
||||
async def batch_handle_new_batchlog(event_data_list: List[Dict]):
|
||||
|
||||
batchlog = event_data_list[0]
|
||||
pidno = batchlog['pidno']
|
||||
strategy = batchlog['strategy']
|
||||
await ApsPayloadSponsor.add_batchlog(pidno=pidno, strategy=strategy, target='RECE')
|
||||
try:
|
||||
if strategy == '库存':
|
||||
await refresh_stock()
|
||||
await ApsPayloadSponsor.add_batchlog(pidno=pidno, strategy=strategy, target='SUCC')
|
||||
except Exception as e:
|
||||
await ApsPayloadSponsor.add_batchlog(pidno=pidno, strategy=strategy, target='FAIL', memo=str(e))
|
||||
await ApsPayloadSponsor.execute_batchlog(event_data_list[0], _STRATEGY_HANDLERS)
|
||||
@@ -206,7 +206,8 @@ def handle_insert_batchlog(database: str, table: str, data: dict):
|
||||
try:
|
||||
new_data = dict_to_lower_keys(data['new'])
|
||||
task = new_data['task']
|
||||
if task == 'API':
|
||||
target = new_data['target']
|
||||
if task == 'API' and target == 'SENT':
|
||||
aps_new_batchlog_event.add_event(new_data)
|
||||
except Exception as e:
|
||||
logger.fail("处理t_batchlog插入事件", "", str(e))
|
||||
|
||||
Reference in New Issue
Block a user