优化租户提示器

This commit is contained in:
2026-05-08 15:08:36 +08:00
parent 1ab48cc102
commit ec31142ce1
14 changed files with 83 additions and 268 deletions
+11 -11
View File
@@ -8,7 +8,7 @@ import asyncio
import os
import time
from typing import Dict, Any, Optional
from globalobjects import logger as log_config, AlertType, alert_manager
from globalobjects import logger as log_config, RemindType, remind_manager
from .collectors import DatabaseCollector
from .failed_operation_recovery import FailedOperationRecovery
@@ -138,20 +138,20 @@ class DatabaseHealthChecker:
if unhealthy_count > 0:
alert_key = f"db_unhealthy_{unhealthy_count}"
current_time = time.time()
last_alert = self._last_alert_time.get(alert_key, 0)
last_remind = self._last_remind_time.get(alert_key, 0)
if current_time - last_alert >= self._alert_cooldown:
await alert_manager.trigger_remind(
AlertType.DB_CONNECTION,
if current_time - last_remind >= self._remind_cooldown:
await remind_manager.trigger_remind(
RemindType.DB_CONNECTION_BREAK,
f"数据库连接异常: {unhealthy_count} 个连接不健康"
)
self._last_alert_time[alert_key] = current_time
self._stats["alert_triggered"] += 1
logger.warning(f"数据库健康检查触发告警: {unhealthy_count} 个连接不健康")
self._last_remind_time[alert_key] = current_time
self._stats["remind_triggered"] += 1
logger.warning(f"数据库健康检查触发提示: {unhealthy_count} 个连接不健康")
else:
self._stats["alert_blocked"] += 1
remaining = int(self._alert_cooldown - (current_time - last_alert))
logger.debug(f"数据库健康检查告警被冷却拦截,剩余 {remaining}")
self._stats["remind_blocked"] += 1
remaining = int(self._remind_cooldown - (current_time - last_remind))
logger.debug(f"数据库健康检查提示被冷却拦截,剩余 {remaining}")
class FailedOperationRecoveryManager:
@@ -7,7 +7,7 @@
import time
import asyncio
from typing import Dict, Any, List
from globalobjects import logger as log_config, AlertType, alert_manager
from globalobjects import logger as log_config, RemindType, remind_manager
from globalobjects.db_manager import get_db_managers
from core.settings import MYAPS_MAIN_DB
@@ -84,11 +84,11 @@ class DatabaseCollector:
status["summary"]["total"] += 1
status["summary"]["unhealthy"] += 1
if status["summary"]["unhealthy"] > 0:
await alert_manager.trigger_remind(AlertType.DB_CONNECTION, status["summary"]["unhealthy"])
await remind_manager.trigger_remind(RemindType.DB_CONNECTION_BREAK, status["summary"]["unhealthy"])
except Exception as e:
logger.error(f"获取数据库连接状态失败: {e}")
status["error"] = str(e)
await alert_manager.trigger_remind(AlertType.DB_CONNECTION, status)
await remind_manager.trigger_remind(RemindType.DB_CONNECTION_BREAK, status)
return status
@@ -123,12 +123,12 @@ class DatabaseCollector:
"pool_available": False,
"error": str(e),
}
await alert_manager.trigger_remind(AlertType.DB_POOL, pool_info)
await remind_manager.trigger_remind(RemindType.DB_POOL_BREAK, pool_info)
except Exception as e:
logger.error(f"获取连接池状态失败: {e}")
pool_info["error"] = str(e)
await alert_manager.trigger_remind(AlertType.DB_POOL, pool_info)
await remind_manager.trigger_remind(RemindType.DB_POOL_BREAK, pool_info)
return pool_info
@@ -5,7 +5,7 @@ HTTP 指标采集器
"""
from typing import Dict, Any, List
from globalobjects import AlertType, alert_manager
from globalobjects import RemindType, remind_manager
from ..middleware import http_metrics_collector
from ..storage import request_storage
from ..models import is_internal_ip
@@ -136,7 +136,7 @@ class HTTPCollector:
}
await request_storage.save_request(request_data)
await alert_manager.trigger_remind(AlertType.REQUEST_SLOW, slow_requests)
await remind_manager.trigger_remind(RemindType.REQUEST_SLOW, slow_requests)
return slow_requests
@@ -177,7 +177,7 @@ class HTTPCollector:
}
await request_storage.save_request(request_data)
await alert_manager.trigger_remind(AlertType.REQUEST_ERROR, error_requests)
await remind_manager.trigger_remind(RemindType.REQUEST_ERROR, error_requests)
return error_requests
@@ -2,7 +2,7 @@ import json
from datetime import datetime, timedelta
from typing import List
from globalobjects import AlertType, alert_manager
from globalobjects import RemindType, remind_manager
from apps.common.monitor.models import FailedOperation
from apps.io_api.utils.db_operation import (
db_exec_sql,
@@ -136,8 +136,8 @@ class FailedOperationRecovery:
op.status = "failed"
# 触发最终告警
await alert_manager.trigger_remind(
AlertType.DB_CONNECTION,
await remind_manager.trigger_remind(
RemindType.DB_CONNECTION_BREAK,
{
"operation_id": op.operation_id,
"db_name": op.db_name,
+30 -28
View File
@@ -64,6 +64,7 @@ from pymysqlreplication.row_event import (
from core.settings import MYAPS_DB_HOST, MYAPS_DB_PORT, MYAPS_DB_USER, MYAPS_DB_PASSWORD, MYAPS_MAIN_DB, MYAPS_DBSET_LIST, TURNON_BINLOG_LISTENER, ENABLE_BINLOG_POSITION, MYAPS_ROOT_PASSWORD
from globalobjects import logger as log_config
from globalobjects.reminder import remind_manager, RemindType
from apps.common.utils.thread_pool_manager import global_pool_manager
import os
import asyncio
@@ -864,12 +865,13 @@ class MySQLBinlogListener:
return parts[0], parts[1] # database, table
return None, full_table_name # 无数据库信息,只有表名
def _send_alert(self, message: str, level: str = "warning"):
"""发送告警通知
def _send_remind(self, message: str, level: str = "warning"):
"""发送提示通知
可通过注册 Reminder 实例来自定义告警方式邮件短信等
使用全局 RemindManager 发送提示支持去重和频率限制
"""
# 记录到日志
remind_type = RemindType.BINLOG_LISTENER_BREAK
if level == "error":
logger.error(f"🚨 告警: {message}")
elif level == "warning":
@@ -877,36 +879,36 @@ class MySQLBinlogListener:
else:
logger.info(f"️ 通知: {message}")
# 通过 Reminder 发送告警
if self._alert_reminder:
try:
alert_content = {
"level": level,
"message": message,
"timestamp": datetime.now().isoformat(),
"source": "binlog_listener"
}
# 在事件循环中异步发送告警
if self._event_loop and self._event_loop.is_running():
asyncio.run_coroutine_threadsafe(
self._alert_reminder.remind(alert_content),
self._event_loop
)
else:
# 如果事件循环未运行,同步执行
import asyncio
asyncio.run(self._alert_reminder.remind(alert_content))
except Exception as e:
logger.error(f"通过 Reminder 发送告警失败: {e}")
# 通过全局 RemindManager 发送提示(带去重和频率限制)
remind_content = {
"level": level,
"message": message,
"timestamp": datetime.now().isoformat(),
"source": "binlog_listener"
}
# 在事件循环中异步发送提示
if self._event_loop and self._event_loop.is_running():
asyncio.run_coroutine_threadsafe(
remind_manager.trigger_remind(remind_type, remind_content),
self._event_loop
)
else:
# 如果事件循环未运行,同步执行
asyncio.create_task(remind_manager.trigger_remind(remind_type, remind_content))
def register_alert_handler(self, reminder):
"""注册告警提醒器
def regist_reminder(self, reminder):
"""注册提示提醒器到全局 RemindManager
Args:
reminder: Reminder 实例需实现 async remind 方法
"""
self._alert_reminder = reminder
logger.info("✅ 告警提醒器已注册")
# 注册到全局 RemindManager,支持所有提示类型
remind_manager.register(reminder, [
RemindType.BINLOG_LISTENER_RESUME,
RemindType.BINLOG_LISTENER_BREAK,
])
logger.info("✅ 提示提醒器已注册到全局 RemindManager")
def get_status(self) -> Dict[str, Any]:
"""获取监控状态信息"""
+3 -3
View File
@@ -13,7 +13,7 @@ from tortoise.models import Model as TortoiseBaseModel
from apps.common.monitor.models import FailedOperation
from core.settings import LOG_LEVEL, MYAPS_DB_SET, MYAPS_DBSET_LIST
from globalobjects import AlertType, alert_manager
from globalobjects import RemindType, remind_manager
from globalobjects import logger as log_config
from globalobjects.db_manager import DbManager, get_db_managers
@@ -373,8 +373,8 @@ def retry_on_connection_error(max_retries: int = 3, retry_delay: float = 1.0):
if error_summary_parts:
error_summary = "; ".join(error_summary_parts)
await alert_manager.trigger_remind(
AlertType.DB_CONNECTION,
await remind_manager.trigger_remind(
RemindType.DB_CONNECTION_BREAK,
{
"operation_id": operation_id,
"db_names": valid_dbs,
+3 -3
View File
@@ -17,14 +17,14 @@ from .event_aggregator import get_global_handler_aggregator
EVENT_AGGREGATOR = get_global_handler_aggregator()
from .reminder import Reminder, AlertType, alert_manager, QqEmailReminder
from .reminder import Reminder, RemindType, remind_manager, QqEmailReminder
from .globalconst import StaticString
__all__ = [
"Reminder",
"AlertType",
"alert_manager",
"RemindType",
"remind_manager",
"QqEmailReminder",
"StaticString",
]
+4 -4
View File
@@ -21,7 +21,7 @@ from core.settings import MYAPS_DB_SET, MYAPS_MAIN_DB, THIS_BASE_URL, SCHEDULER_
from .._base import (
get_scheduler_minute, cron_task, CLIENT_LOGGER, CLIENT_SESSION, PROJECT_JSON_FILE,
ApsPayloadSponsor, EventResultPoster, get_session, CacheItem,
AlertType, async_rate_limit, event_batch_handler,
RemindType, async_rate_limit, event_batch_handler,
TSupply, async_service_operation, batch_service_operation
)
@@ -63,11 +63,11 @@ hacyxs_tplus_conn = get_tplus_conn()
from .remind import bus_reminder, ops_reminder
# ⬇️binlog监听告警注册
# ⬇️binlog监听告警注册(统一使用全局AlertManager
from apps.data_opt.utils.binlog_listener import binlog_listener as bl
bl.register_alert_handler(ops_reminder)
CLIENT_LOGGER.info("binlog监听告警提醒器已注册")
bl.regist_reminder(ops_reminder)
CLIENT_LOGGER.info("binlog监听提示提醒器已注册到全局RemindManager")
#################################################################################
-12
View File
@@ -46,17 +46,5 @@
"itemno": "P01",
"plant": "chaoyue",
"planner": "chaoyue"
},
"ops_reminder": {
"smtp_user": "2982212683@qq.com",
"smtp_password": "jyboujldhplddhdf",
"email_from": "2982212683@qq.com",
"email_to": "2982212683@qq.com"
},
"bus_reminder": {
"smtp_user": "2982212683@qq.com",
"smtp_password": "jyboujldhplddhdf",
"email_from": "2982212683@qq.com",
"email_to": "2982212683@qq.com"
}
}
+10 -90
View File
@@ -7,106 +7,26 @@
python remind.py --message "服务异常" --level error --subject "告警测试"
"""
import argparse
import asyncio
import json
import os
import sys
from datetime import datetime
# 将项目根目录添加到路径
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.insert(0, project_root)
from globalobjects.reminder import QqEmailReminder
def load_project_config():
"""从环境变量和JSON配置文件加载项目配置"""
# 读取 .env 文件获取 PROJECT_JSON 配置
env_path = os.path.join(project_root, '.env')
project_json = 'dev'
if os.path.exists(env_path):
with open(env_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line.startswith('PROJECT_JSON='):
project_json = line.split('=', 1)[1].strip()
break
# 构建配置文件路径
project_dir = os.path.basename(os.path.dirname(os.path.abspath(__file__)))
config_path = os.path.join(project_root, 'project_files', project_dir, f'{project_json}.json')
if not os.path.exists(config_path):
raise FileNotFoundError(f"配置文件不存在: {config_path}")
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
# 全局配置缓存
project_config = load_project_config()
ops_config = project_config.get('ops_reminder', {})
bus_config = project_config.get('bus_reminder', {})
ops_reminder = QqEmailReminder(
smtp_user=ops_config.get('smtp_user', ''),
smtp_password=ops_config.get('smtp_password', ''),
email_from=ops_config.get('email_from', ''),
email_to=ops_config.get('email_to', ''),
smtp_user="2982212683@qq.com",
smtp_password="jyboujldhplddhdf",
email_from="2982212683@qq.com",
email_to="2982212683@qq.com",
)
bus_reminder = QqEmailReminder(
smtp_user=bus_config.get('smtp_user', ''),
smtp_password=bus_config.get('smtp_password', ''),
email_from=bus_config.get('email_from', ''),
email_to=bus_config.get('email_to', ''),
smtp_user="2982212683@qq.com",
smtp_password="jyboujldhplddhdf",
email_from="2982212683@qq.com",
email_to="2982212683@qq.com",
)
async def send_alert(message: str, level: str = "warning", subject: str = None):
"""发送告警"""
if subject is None:
level_prefix = {
"info": "️ 信息",
"warning": "⚠️ 警告",
"error": "❌ 错误"
}.get(level, "🔔")
subject = f"{level_prefix} 系统告警"
alert_content = {
"level": level,
"message": message,
"timestamp": datetime.now().isoformat(),
"source": "service_daemon"
}
await ops_reminder.remind(alert_content)
return True
def main():
parser = argparse.ArgumentParser(description="项目告警发送工具")
parser.add_argument("--message", required=True, help="告警消息内容")
parser.add_argument("--level", default="warning",
choices=["info", "warning", "error"], help="告警级别")
parser.add_argument("--subject", help="邮件主题(可选)")
args = parser.parse_args()
try:
asyncio.run(send_alert(args.message, args.level, args.subject))
print(f"[SUCCESS] Alert sent: {args.subject or args.message}")
return 0
except Exception as e:
print(f"[ERROR] Failed to send alert: {e}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main())
import sys
sys.exit(ops_reminder.remind_by_shell())
-3
View File
@@ -14,7 +14,6 @@ from .._base import (
get_scheduler_minute, async_rate_limit, CacheItem,
ApsPayloadSponsor, EventResultPoster, CLIENT_LOGGER, standard_response, get_session, event_batch_handler,
cron_task, add_basic_auth_requests, db_delete, db_bupsert, db_query, PROJECT_JSON_FILE, pdv,
AlertType, QqEmailReminder, Reminder
)
@@ -320,5 +319,3 @@ async def batch_handle_pl_status_a2e(event_data_list: List[Dict], _erp: EventRes
cache = await _aps.establish_production_cache(supplynos=supply_nos)
tasks = [handle_pl_status_a2e(event_data=item, _aps=_aps) for item in event_data_list]
await asyncio.gather(*tasks, return_exceptions=True)
-12
View File
@@ -40,17 +40,5 @@
"planner": "haida",
"leadday_e": 1,
"leadday_f": 1
},
"ops_reminder": {
"smtp_user": "2982212683@qq.com",
"smtp_password": "jyboujldhplddhdf",
"email_from": "2982212683@qq.com",
"email_to": "2982212683@qq.com"
},
"bus_reminder": {
"smtp_user": "2982212683@qq.com",
"smtp_password": "jyboujldhplddhdf",
"email_from": "2982212683@qq.com",
"email_to": "2982212683@qq.com"
}
}
+10 -90
View File
@@ -7,106 +7,26 @@
python remind.py --message "服务异常" --level error --subject "告警测试"
"""
import argparse
import asyncio
import json
import os
import sys
from datetime import datetime
# 将项目根目录添加到路径
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.insert(0, project_root)
from globalobjects.reminder import QqEmailReminder
def load_project_config():
"""从环境变量和JSON配置文件加载项目配置"""
# 读取 .env 文件获取 PROJECT_JSON 配置
env_path = os.path.join(project_root, '.env')
project_json = 'dev'
if os.path.exists(env_path):
with open(env_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line.startswith('PROJECT_JSON='):
project_json = line.split('=', 1)[1].strip()
break
# 构建配置文件路径
project_dir = os.path.basename(os.path.dirname(os.path.abspath(__file__)))
config_path = os.path.join(project_root, 'project_files', project_dir, f'{project_json}.json')
if not os.path.exists(config_path):
raise FileNotFoundError(f"配置文件不存在: {config_path}")
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
# 全局配置缓存
project_config = load_project_config()
ops_config = project_config.get('ops_reminder', {})
bus_config = project_config.get('bus_reminder', {})
ops_reminder = QqEmailReminder(
smtp_user=ops_config.get('smtp_user', ''),
smtp_password=ops_config.get('smtp_password', ''),
email_from=ops_config.get('email_from', ''),
email_to=ops_config.get('email_to', ''),
smtp_user="2982212683@qq.com",
smtp_password="jyboujldhplddhdf",
email_from="2982212683@qq.com",
email_to="2982212683@qq.com",
)
bus_reminder = QqEmailReminder(
smtp_user=bus_config.get('smtp_user', ''),
smtp_password=bus_config.get('smtp_password', ''),
email_from=bus_config.get('email_from', ''),
email_to=bus_config.get('email_to', ''),
smtp_user="2982212683@qq.com",
smtp_password="jyboujldhplddhdf",
email_from="2982212683@qq.com",
email_to="2982212683@qq.com",
)
async def send_alert(message: str, level: str = "warning", subject: str = None):
"""发送告警"""
if subject is None:
level_prefix = {
"info": "️ 信息",
"warning": "⚠️ 警告",
"error": "❌ 错误"
}.get(level, "🔔")
subject = f"{level_prefix} 系统告警"
alert_content = {
"level": level,
"message": message,
"timestamp": datetime.now().isoformat(),
"source": "service_daemon"
}
await ops_reminder.remind(alert_content)
return True
def main():
parser = argparse.ArgumentParser(description="项目告警发送工具")
parser.add_argument("--message", required=True, help="告警消息内容")
parser.add_argument("--level", default="warning",
choices=["info", "warning", "error"], help="告警级别")
parser.add_argument("--subject", help="邮件主题(可选)")
args = parser.parse_args()
try:
asyncio.run(send_alert(args.message, args.level, args.subject))
print(f"[SUCCESS] Alert sent: {args.subject or args.message}")
return 0
except Exception as e:
print(f"[ERROR] Failed to send alert: {e}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main())
import sys
sys.exit(ops_reminder.remind_by_shell())
+1 -1
View File
@@ -18,7 +18,7 @@ from globalobjects.globalconst import OrderStatusEnum
# ❗❗❗❗❗❗❗❗❗❗❗❗⬇️不要删掉,便于各项目文件引用 ❗❗❗❗❗❗❗❗❗❗❗❗
from core.settings import MYAPS_MAIN_DB, THIS_BASE_URL, MYAPS_DB_SET, PROJECT_JSON
from globalobjects import logger as log_config, PROJECT_JSON_FILE, ProjectDefaultValues as pdv, AlertType, QqEmailReminder, Reminder
from globalobjects import logger as log_config, PROJECT_JSON_FILE, ProjectDefaultValues as pdv, RemindType, QqEmailReminder, Reminder
from apps.io_api.utils.common import standard_response
from apps.io_api.utils.db_operation import db_delete, db_bupsert, call_dbprocdure, db_query, db_supsert, db_update_by_index
from apps.io_api.models import TSupply