This commit is contained in:
2026-04-19 22:32:21 +08:00
parent 4050df95cb
commit ea32677ba9
9 changed files with 229 additions and 212 deletions
@@ -38,7 +38,7 @@ class HTTPCollector:
logger = log_config.get_logger(__name__)
requests = list(self._collector._requests)
logger.info(f"开始保存请求数据,共 {len(requests)} 个请求")
logger.debug(f"开始保存请求数据,共 {len(requests)} 个请求")
saved_count = 0
for req in requests:
@@ -76,7 +76,8 @@ class HTTPCollector:
except Exception as e:
logger.error(f"保存请求数据失败: {e}")
logger.info(f"请求数据保存完成,共保存 {saved_count} 个请求")
logger.debug(f"请求数据保存完成,共保存 {saved_count} 个请求")
async def get_slow_requests(self, limit: int = 10) -> List[Dict[str, Any]]:
"""
@@ -395,7 +395,8 @@ class OutboundHTTPCollector:
await Tortoise.init(config=TORTOISE_ORM_CONFIG)
await outbound_request_storage.save_request(request_data)
except Exception as e:
print(f"保存对外请求到数据库失败: {e}")
pass
# print(f"保存对外请求到数据库失败: {e}")
finally:
# 不要关闭数据库连接,避免影响其他线程
pass
+1 -1
View File
@@ -156,7 +156,7 @@ class ResourceMonitor:
# 2. 触发垃圾回收
collected = gc.collect()
logger.info(f"垃圾回收完成,回收对象数: {collected}")
logger.debug(f"垃圾回收完成,回收对象数: {collected}")
# 3. 检查并清理其他资源
# 这里可以添加其他资源清理逻辑
+122 -137
View File
@@ -158,6 +158,7 @@ class _ProductionDataCache:
finally:
self._is_loading = False
def _initialize(self, db_name: str):
"""同步版本的初始化方法,使用 API 方式获取数据"""
try:
@@ -194,160 +195,144 @@ class _ProductionDataCache:
logger.fail("生产数据缓存", "", f"同步初始化失败: {e}")
raise
def _fetch_paginated_data(self, url: str, page_size: int = 1000):
"""通用的分页获取数据方法"""
all_data = []
page_index = 1
max_retries = 3
retry_delay = 2 # 秒
while True:
paginated_url = f"{url}&page_index={page_index}&page_size={page_size}"
for attempt in range(max_retries):
try:
response = SESSION.get(paginated_url, timeout=(30, 60))
response.raise_for_status()
result = response.json()
data_list = result.get('data', [])
all_data.extend(data_list)
# 检查是否还有下一页
total = result.get('meta', {}).get('total', 0)
if len(all_data) >= total:
return all_data
page_index += 1
break
except Exception as e:
logger.warning("生产数据缓存", "", f"分页获取数据失败 (尝试 {attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
else:
# 达到最大重试次数,返回已获取的数据
return all_data
def _build_cache(self, db_name: str, api_endpoint: str, cache_name: str, cache_factory, process_item):
"""通用的缓存构建方法"""
cache = cache_factory()
url = f"{THIS_BASE_URL}{api_endpoint}?db_name={db_name}"
# 使用分页机制获取数据
data_list = self._fetch_paginated_data(url)
for item in data_list:
process_item(item, cache)
self._cache[cache_name] = cache
def _build_supply_mo_cache(self, db_name: str):
"""使用 API 方式构建 supply_mo 缓存"""
cache = {}
max_retries = 3
retry_delay = 2 # 秒
def process_item(item, cache):
supply_no = item.get('supplyno', '')
if supply_no:
cache[supply_no] = item
url = f"{THIS_BASE_URL}/api/v_supply_mo/page?db_name={db_name}"
for attempt in range(max_retries):
try:
response = SESSION.get(url, timeout=(30, 60))
response.raise_for_status()
result = response.json()
data_list = result['data']
for item in data_list:
supply_no = item.get('supplyno', '')
if supply_no:
cache[supply_no] = item
# 成功获取数据,跳出重试循环
break
except Exception as e:
logger.warning("生产数据缓存", "", f"获取 supply_mo 失败 (尝试 {attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
self._cache['supply_mo'] = cache
self._build_cache(
db_name=db_name,
api_endpoint="/api/v_supply_mo/page",
cache_name="supply_mo",
cache_factory=dict,
process_item=process_item
)
def _build_orderwc_cache(self, db_name: str):
"""使用 API 方式构建 orderwc 缓存(以 supplyno 为索引)"""
cache = defaultdict(list)
max_retries = 3
retry_delay = 2 # 秒
url = f"{THIS_BASE_URL}/api/v_orderwc/page?db_name={db_name}"
def process_item(item, cache):
supply_no = item.get('supplyno', '')
if supply_no:
cache[supply_no].append(item)
for attempt in range(max_retries):
try:
response = SESSION.get(url, timeout=(30, 60))
response.raise_for_status()
result = response.json()
data_list = result['data']
for item in data_list:
supply_no = item.get('supplyno', '')
if supply_no:
cache[supply_no].append(item)
# 成功获取数据,跳出重试循环
break
except Exception as e:
logger.warning("生产数据缓存", "", f"获取 orderwc 失败 (尝试 {attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
self._cache['orderwc'] = cache
self._build_cache(
db_name=db_name,
api_endpoint="/api/v_orderwc/page",
cache_name="orderwc",
cache_factory=lambda: defaultdict(list),
process_item=process_item
)
def _build_demand_cache(self, db_name: str):
"""使用 API 方式构建 demand 缓存"""
cache = defaultdict(list)
max_retries = 3
retry_delay = 2 # 秒
def process_item(item, cache):
demand_no = item.get('demandno', '')
if demand_no:
cache[demand_no].append(item)
url = f"{THIS_BASE_URL}/api/v_demand/page?db_name={db_name}"
for attempt in range(max_retries):
try:
response = SESSION.get(url, timeout=(30, 60))
response.raise_for_status()
result = response.json()
data_list = result['data']
for item in data_list:
demand_no = item.get('demandno', '')
if demand_no:
cache[demand_no].append(item)
# 成功获取数据,跳出重试循环
break
except Exception as e:
logger.warning("生产数据缓存", "", f"获取 demand 失败 (尝试 {attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
self._cache['demand'] = cache
self._build_cache(
db_name=db_name,
api_endpoint="/api/v_demand/page",
cache_name="demand",
cache_factory=lambda: defaultdict(list),
process_item=process_item
)
def _build_peg_cache(self, db_name: str):
"""使用 API 方式构建 peg 缓存(双向索引)"""
cache = {
'demand_to_supply': defaultdict(list),
'supply_to_demand': defaultdict(list)
}
max_retries = 3
retry_delay = 2 # 秒
def process_item(item, cache):
demand_no = item.get('demandno', '')
s_supply_no = item.get('s_supplyno', '')
if demand_no and s_supply_no:
if s_supply_no not in cache['demand_to_supply'][demand_no]:
cache['demand_to_supply'][demand_no].append(s_supply_no)
if demand_no not in cache['supply_to_demand'][s_supply_no]:
cache['supply_to_demand'][s_supply_no].append(demand_no)
url = f"{THIS_BASE_URL}/api/v_peg/mini?db_name={db_name}"
def peg_cache_factory():
return {
'demand_to_supply': defaultdict(list),
'supply_to_demand': defaultdict(list)
}
for attempt in range(max_retries):
try:
response = SESSION.get(url, timeout=(30, 60))
response.raise_for_status()
result = response.json()
data_list = result['data']
for item in data_list:
demand_no = item.get('demandno', '')
s_supply_no = item.get('s_supplyno', '')
if demand_no and s_supply_no:
if s_supply_no not in cache['demand_to_supply'][demand_no]:
cache['demand_to_supply'][demand_no].append(s_supply_no)
if demand_no not in cache['supply_to_demand'][s_supply_no]:
cache['supply_to_demand'][s_supply_no].append(demand_no)
# 成功获取数据,跳出重试循环
break
except Exception as e:
logger.warning("生产数据缓存", "", f"获取 peg 失败 (尝试 {attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
self._cache['peg'] = cache
self._build_cache(
db_name=db_name,
api_endpoint="/api/v_peg/mini",
cache_name="peg",
cache_factory=peg_cache_factory,
process_item=process_item
)
def _build_material_cache(self, db_name: str):
"""使用 API 方式构建 material 缓存"""
cache = {}
max_retries = 3
retry_delay = 2 # 秒
def process_item(item, cache):
material_no = item.get('materialno', '')
if material_no:
cache[material_no] = item
url = f"{THIS_BASE_URL}/api/t_material/page?db_name={db_name}"
for attempt in range(max_retries):
try:
response = SESSION.get(url, timeout=(30, 60))
response.raise_for_status()
result = response.json()
data_list = result['data']
for item in data_list:
material_no = item.get('materialno', '')
if material_no:
cache[material_no] = item
# 成功获取数据,跳出重试循环
break
except Exception as e:
logger.warning("生产数据缓存", "", f"获取 material 失败 (尝试 {attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
self._cache['material'] = cache
self._build_cache(
db_name=db_name,
api_endpoint="/api/t_material/page",
cache_name="material",
cache_factory=dict,
process_item=process_item
)
def establish_production_cache(self, db_name: str):
@@ -932,8 +917,8 @@ class ApsHelpers:
mo_data = cache.get_supply_mo(supplyno)
if mo_data:
all_orderwc = list(cache._cache['orderwc'].values())
mo_data['orderwc'] = [_ for _ in all_orderwc if _['orderno'].startswith(supplyno)]
mo_data['orderwc'] = cache.get_orderwc(supplyno)
if get_origin_so:
vendorno = mo_data.get('vendorno', '')
@@ -944,7 +929,7 @@ class ApsHelpers:
related_demand = cache.get_demand(demand_no=supplyno)
demands_data = [_ for _ in related_demand if _.get('type') != 'SO']
if demands_data:
demands_no = ','.join([f"'{i['demandno']}'" for i in demands_data])
demands_no = [_['demandno'] for _ in demands_data]
peg_query_result = cache.batch_get_peg_by_demand(demands_no)
mo_data['prev_mo'] = cache.batch_get_supply_mo(peg_query_result)
+21 -3
View File
@@ -927,6 +927,9 @@ class MySQLBinlogMonitor:
def _run_handler(self, handler, *args, **kwargs):
"""运行处理器函数,支持同步和异步函数"""
handler_name = getattr(handler, '__name__', str(handler))
start_time = time.time()
try:
result = handler(*args, **kwargs)
# 检查是否是协程对象
@@ -963,13 +966,28 @@ class MySQLBinlogMonitor:
def callback(fut):
try:
fut.result()
exec_time = time.time() - start_time
if exec_time > 5.0:
logger.warning(f"异步处理器 {handler_name} 执行时间过长: {exec_time:.2f}")
elif exec_time > 1.0:
logger.debug(f"异步处理器 {handler_name} 执行时间: {exec_time:.2f}")
except Exception as e:
logger.fail("异步处理器执行", "", str(e))
logger.fail(f"异步处理器 {handler_name} 执行", "", str(e))
future.add_done_callback(callback)
except Exception as e:
logger.fail("异步处理器提交", "", str(e))
logger.fail(f"异步处理器 {handler_name} 提交", "", str(e))
else:
# 同步函数执行完成
exec_time = time.time() - start_time
if exec_time > 5.0:
logger.warning(f"同步处理器 {handler_name} 执行时间过长: {exec_time:.2f}")
elif exec_time > 1.0:
logger.debug(f"同步处理器 {handler_name} 执行时间: {exec_time:.2f}")
except Exception as e:
logger.fail("处理器执行", "", str(e))
logger.fail(f"处理器 {handler_name} 执行", "", str(e))
finally:
# 确保即使出错也能继续处理其他事件
pass
def process_binlog_event(self, event):
+35 -32
View File
@@ -463,9 +463,9 @@ async def get_meta():
include_in_schema=False
)
async def get_material_page(
db_name: str = Query(MYAPS_MAIN_DB, examples={"default": {"value": MYAPS_MAIN_DB}}, description="账套"),
page_index: int = Query(0, description="页码"),
page_size: int = Query(1000, description="每页数量"),
db_name: str = Query(MYAPS_MAIN_DB, examples={"default": {"value": MYAPS_MAIN_DB}}, description="账套")
):
db_name = db_name.replace(" ", "")
return await db_query(db_name=db_name, model_or_tablename="t_material", page_index=page_index, page_size=page_size)
@@ -820,19 +820,19 @@ async def delete_supply(
)
async def get_demand_page(
db_name: str = Query(MYAPS_MAIN_DB, examples={"default": {"value": MYAPS_MAIN_DB}}, description="账套"),
starttime: str = Query(None, description="需求开始时间"),
endtime: str = Query(None, description="需求截止时间"),
pagesize: int = Query(1000, description="每页数量"),
pageindex: int = Query(0, description="页码"),
start_time: str = Query(None, description="需求开始时间"),
end_time: str = Query(None, description="需求截止时间"),
page_size: int = Query(1000, description="每页数量"),
page_index: int = Query(0, description="页码"),
):
db_name = db_name.replace(" ", "")
filter = []
if starttime:
filter.append(f"`Req_Date` >= '{starttime}'")
if endtime:
filter.append(f"`Req_Date` <= '{endtime}'")
if start_time:
filter.append(f"`Req_Date` >= '{start_time}'")
if end_time:
filter.append(f"`Req_Date` <= '{end_time}'")
filter_string = " AND ".join(filter)
return await db_query(db_name=db_name, model_or_tablename="v_demand", filter_string=filter_string, page_size=pagesize, page_index=pageindex)
return await db_query(db_name=db_name, model_or_tablename="v_demand", filter_string=filter_string, page_size=page_size, page_index=page_index)
@rt.get(
@@ -921,21 +921,21 @@ async def patch_demand(
)
async def get_mo_page(
db_name: str = Query(MYAPS_MAIN_DB, examples={"default": {"value": MYAPS_MAIN_DB}}, description="账套"),
starttime: datetime = Query(None, description="工单开工时间"),
endtime: datetime = Query(None, description="工单完工时间"),
pagesize: int = Query(1000, description="每页数量"),
pageindex: int = Query(0, description="页码"),
start_time: datetime = Query(None, description="工单开工时间"),
end_time: datetime = Query(None, description="工单完工时间"),
page_size: int = Query(1000, description="每页数量"),
page_index: int = Query(0, description="页码"),
# x_api_key: str = Header(None, description="API密钥")
):
db_name = db_name.replace(" ", "")
filter = []
if starttime:
filter.append(f"`DT_OrdStart` >= '{starttime}'")
if endtime:
filter.append(f"`DT_OrdEnd` <= '{endtime}'")
if start_time:
filter.append(f"`DT_OrdStart` >= '{start_time}'")
if end_time:
filter.append(f"`DT_OrdEnd` <= '{end_time}'")
filter_string = " AND ".join(filter)
result = await db_query(db_name=db_name, model_or_tablename="v_supply_mo", filter_string=filter_string, page_size=pagesize, page_index=pageindex)
result = await db_query(db_name=db_name, model_or_tablename="v_supply_mo", filter_string=filter_string, page_size=page_size, page_index=page_index)
return result
@@ -1146,20 +1146,20 @@ async def query_workreport(
)
async def get_orderwc_page(
db_name: str = Query(MYAPS_MAIN_DB, examples={"default": {"value": MYAPS_MAIN_DB}}, description="账套"),
starttime: datetime = Query(None, description="工序开工时间"),
endtime: datetime = Query(None, description="工序完工时间"),
pagesize: int = Query(1000, description="每页数量"),
pageindex: int = Query(0, description="页码"),
start_time: datetime = Query(None, description="工序开工时间"),
end_time: datetime = Query(None, description="工序完工时间"),
page_size: int = Query(1000, description="每页数量"),
page_index: int = Query(0, description="页码"),
# x_api_key: str = Header(None, description="API密钥")
):
db_name = db_name.replace(" ", "")
filter = []
if starttime:
filter.append(f"`DT_Start` >= '{starttime}'")
if endtime:
filter.append(f"`DT_End` <= '{endtime}'")
if start_time:
filter.append(f"`DT_Start` >= '{start_time}'")
if end_time:
filter.append(f"`DT_End` <= '{end_time}'")
filter_string = " AND ".join(filter)
return await db_query(db_name=db_name, model_or_tablename="v_orderwc", filter_string=filter_string, page_size=pagesize, page_index=pageindex)
return await db_query(db_name=db_name, model_or_tablename="v_orderwc", filter_string=filter_string, page_size=page_size, page_index=page_index)
@rt.get(
@@ -1184,11 +1184,11 @@ async def get_orderwc(
)
async def get_peg_page(
db_name: str = Query(MYAPS_MAIN_DB, examples={"default": {"value": MYAPS_MAIN_DB}}, description="账套"),
pagesize: int = Query(1000, description="每页数量"),
pageindex: int = Query(0, description="页码"),
page_size: int = Query(1000, description="每页数量"),
page_index: int = Query(0, description="页码"),
):
db_name = db_name.replace(" ", "")
return await db_query(db_name=db_name, model_or_tablename="v_peg", page_size=pagesize, page_index=pageindex)
return await db_query(db_name=db_name, model_or_tablename="v_peg", page_size=page_size, page_index=page_index)
@rt.get(
@@ -1200,9 +1200,12 @@ async def get_peg_page(
)
async def get_peg_relation(
db_name: str = Query(MYAPS_MAIN_DB, examples={"default": {"value": MYAPS_MAIN_DB}}, description="账套"),
page_size: int = Query(10000, description="每页数量"),
page_index: int = Query(0, description="页码"),
):
db_name = db_name.replace(" ", "")
return await db_exec_sql(db_name=db_name, sql=V_PEG_SQL.format(where_string="1=1"), description="查询需求与供应的匹配关系")
return await db_query(db_name=db_name, model_or_tablename="v_peg", page_size=page_size, page_index=page_index)
# return await db_exec_sql(db_name=db_name, sql=V_PEG_SQL.format(where_string="1=1"), description="查询需求与供应的匹配关系")
+3 -3
View File
@@ -71,14 +71,14 @@ async def websocket_endpoint(websocket: WebSocket, path: str = ""):
"query_params": dict(websocket.query_params),
"headers": {k: v for k, v in websocket.headers.items() if k.lower() not in ['cookie', 'authorization']},
}
log_config.info(f"WebSocket 连接请求: {client_info}")
log_config.debug(f"WebSocket 连接请求: {client_info}")
try:
# 保持连接并接收消息,3秒超时后自动关闭(缩短超时时间)
while True:
try:
message = await asyncio.wait_for(websocket.receive_text(), timeout=60.0)
log_config.info(f"WebSocket 收到消息 [{full_path}]: {message}")
log_config.debug(f"WebSocket 收到消息 [{full_path}]: {message}")
await websocket.send_json({
"status": "received",
"path": full_path,
@@ -87,7 +87,7 @@ async def websocket_endpoint(websocket: WebSocket, path: str = ""):
except asyncio.TimeoutError:
break
except WebSocketDisconnect:
log_config.info(f"WebSocket 客户端断开连接: {client_info['client']} - {full_path}")
log_config.debug(f"WebSocket 客户端断开连接: {client_info['client']} - {full_path}")
except Exception as e:
log_config.warning(f"WebSocket 异常 [{full_path}]: {e}")
finally:
+14 -10
View File
@@ -1339,16 +1339,20 @@ class DbManager:
# 尝试关闭所有连接
try:
from tortoise.connection import connections
for conn_name in connections._connections.keys():
try:
conn = connections.get(conn_name)
if conn and hasattr(conn, 'close'):
try:
await conn.close()
except Exception as close_error:
logger.warning(f"关闭连接 {conn_name} 时出错: {close_error}")
except Exception as get_conn_error:
logger.warning(f"获取连接 {conn_name} 时出错: {get_conn_error}")
# 安全检查:确保connections对象和_connections属性存在
if hasattr(connections, '_connections'):
for conn_name in connections._connections.keys():
try:
conn = connections.get(conn_name)
if conn and hasattr(conn, 'close'):
try:
await conn.close()
except Exception as close_error:
logger.warning(f"关闭连接 {conn_name} 时出错: {close_error}")
except Exception as get_conn_error:
logger.warning(f"获取连接 {conn_name} 时出错: {get_conn_error}")
else:
logger.info("连接池未初始化,跳过关闭操作")
except Exception as close_all_error:
logger.warning(f"关闭所有连接时出错: {close_all_error}")
+28 -23
View File
@@ -161,37 +161,42 @@ from collections import defaultdict
@mysql_monitor.on_update_for_table("t_supply", database=MYAPS_MAIN_DB)
def handle_update_supply(database: str, table: str, data: dict, data_diff: dict):
"""处理t_supply表的更新事件"""
from apps.data_opt.components._base import ApsHelpers, get_production_cache
try:
from apps.data_opt.components._base import ApsHelpers, get_production_cache
data_before = dict_to_lower_keys(data['old'])
type_before = data_before['type']
status_before = data_before['status']
data_now = dict_to_lower_keys(data['new'])
type_now = data_now['type']
status_now = data_now['status']
if type_now == 'PL' and status_now == "A2E" and status_before in ["NEW", "CRE"]:
plno = data_now['supplyno']
data_before = dict_to_lower_keys(data['old'])
type_before = data_before['type']
status_before = data_before['status']
aps_pl_status_a2e_event.add_event(data_now)
elif type_before == 'PL' and type_now == 'MO':
# 当 PL下达成功后,推送领料申请(RS)
aps_pl_typeto_mo_event.add_event(data_now)
data_now = dict_to_lower_keys(data['new'])
type_now = data_now['type']
status_now = data_now['status']
if type_now == 'PL' and status_now == "A2E" and status_before in ["NEW", "CRE"]:
plno = data_now['supplyno']
aps_pl_status_a2e_event.add_event(data_now)
elif type_before == 'PL' and type_now == 'MO':
# 当 PL下达成功后,推送领料申请(RS)
aps_pl_typeto_mo_event.add_event(data_now)
except Exception as e:
logger.fail("处理t_supply更新事件", "", str(e))
@mysql_monitor.on_insert_for_table("t_supply", database=MYAPS_MAIN_DB)
def handle_insert_supply(database: str, table: str, data: dict):
"""处理t_supply表的插入事件"""
from apps.data_opt.components._base import ApsHelpers
try:
from apps.data_opt.components._base import ApsHelpers
new_data = dict_to_lower_keys(data['new'])
type_ = new_data['type']
# status_now = new_data['status']
new_data = dict_to_lower_keys(data['new'])
type_ = new_data['type']
# status_now = new_data['status']
if type_ == 'PR':
aps_pr_created_event.add_event(new_data)
if type_ == 'PR':
aps_pr_created_event.add_event(new_data)
except Exception as e:
logger.fail("处理t_supply插入事件", "", str(e))