加固binlog监听器

This commit is contained in:
2026-05-07 21:14:29 +08:00
parent ff79170874
commit d69e8b88ea
13 changed files with 475 additions and 67 deletions
+10
View File
@@ -542,6 +542,16 @@ async def get_event_helpers_metrics():
return monitor_service.get_event_helpers_metrics()
@router.get("/binlog-listener")
async def get_binlog_listener_status():
"""
获取binlog listener状态
返回binlog listener的运行状态、健康状态、待处理事件数、背压状态等信息
"""
return monitor_service.get_binlog_listener_status()
@router.get("/dead-letter")
async def get_dead_letter_events(limit: int = 50):
"""
+16
View File
@@ -261,6 +261,22 @@ class MonitorService:
self._set_cache_data("event_helpers", metrics)
return metrics
def get_binlog_listener_status(self) -> Dict[str, Any]:
"""获取binlog listener状态"""
try:
from apps.data_opt.utils.binlog_listener import binlog_listener
status = binlog_listener.get_status()
return {
"timestamp": time.time(),
"binlog_listener": status
}
except Exception as e:
logger.error(f"获取binlog listener状态失败: {e}")
return {
"timestamp": time.time(),
"error": str(e)
}
def get_redis_metrics(self) -> Dict[str, Any]:
"""获取 Redis 监控指标"""
# 尝试从缓存获取
+280 -43
View File
@@ -338,6 +338,98 @@ class ConnectionHealthChecker:
self._stop_event.wait(self.check_interval)
class _EventLoopHealthChecker:
"""事件循环健康检查器 - 定期检查事件循环状态"""
def __init__(self, listener, check_interval: int = 30):
self._listener = listener
self._check_interval = check_interval
self._stop_event = threading.Event()
self._check_thread = None
self._last_loop_check_time = 0
self._consecutive_failures = 0
self._max_consecutive_failures = 3
self._is_healthy = True
def start(self):
"""启动事件循环健康检查器"""
self._stop_event.clear()
self._check_thread = threading.Thread(
target=self._check_loop,
daemon=True,
name='event-loop-health-checker'
)
self._check_thread.start()
logger.info("✅ 事件循环健康检查器已启动")
def stop(self):
"""停止事件循环健康检查器"""
self._stop_event.set()
if self._check_thread:
self._check_thread.join(timeout=5)
logger.info("🛑 事件循环健康检查器已停止")
def is_healthy(self) -> bool:
"""获取事件循环健康状态"""
return self._is_healthy
def _check_loop(self):
"""健康检查主循环"""
while not self._stop_event.is_set():
self._do_check()
self._stop_event.wait(self._check_interval)
def _do_check(self):
"""执行健康检查"""
try:
event_loop = getattr(self._listener, '_event_loop', None)
if event_loop is None:
logger.debug("⚠️ 事件循环未初始化")
self._consecutive_failures += 1
return
# 检查事件循环是否还在运行
if not event_loop.is_running():
logger.error("❌ 事件循环已停止运行")
self._is_healthy = False
self._consecutive_failures += 1
if self._consecutive_failures >= self._max_consecutive_failures:
# 通过健康检查器发送告警
if hasattr(self._listener, '_health_checker'):
self._listener._health_checker._send_alert(
f"binlog监听事件循环已停止运行,已连续失败 {self._consecutive_failures}",
"error"
)
return
# 检查pending任务数(Python 3.7+
try:
pending_count = len(asyncio.all_tasks(event_loop))
logger.debug(f"事件循环当前pending任务数: {pending_count}")
# 如果pending任务过多,发出警告
if pending_count > 1000:
logger.warning(f"⚠️ 事件循环pending任务过多: {pending_count}")
self._is_healthy = False
if hasattr(self._listener, '_health_checker'):
self._listener._health_checker._send_alert(
f"binlog监听事件循环pending任务过多: {pending_count}",
"warning"
)
else:
self._is_healthy = True
self._consecutive_failures = 0 # 成功后重置
self._last_loop_check_time = time.time()
except Exception as e:
logger.warning(f"⚠️ 事件循环状态检查失败: {e}")
self._consecutive_failures += 1
except Exception as e:
logger.error(f"❌ 事件循环健康检查异常: {e}")
self._is_healthy = False
self._consecutive_failures += 1
class MySQLBinlogListener:
# 单例模式实现
@@ -403,11 +495,25 @@ class MySQLBinlogListener:
check_interval=30 # 每30秒检查一次
)
# 初始化事件循环健康检查器
self._event_loop_health_checker = _EventLoopHealthChecker(
self,
check_interval=30 # 每30秒检查一次
)
# 重试配置
self._max_retry_wait = 300 # 最大重试等待时间(5分钟)
self._consecutive_errors = 0 # 连续错误计数
self._last_error_time = 0 # 上次错误时间
# 背压监控配置
self._pending_events = 0 # 当前待处理事件数
self._pending_lock = threading.Lock() # 计数器锁
self._backpressure_threshold = 10000 # 背压告警阈值
self._backpressure_warning_threshold = int(self._backpressure_threshold * 0.75) # 警告阈值(75%
self._last_backpressure_warning = 0 # 上次背压告警时间
self._backpressure_warning_interval = 60 # 告警间隔(秒)
if MYAPS_DBSET_LIST and TURNON_BINLOG_LISTENER:
# 使用全局线程池管理器
self._min_workers = 5
@@ -761,7 +867,7 @@ class MySQLBinlogListener:
def _send_alert(self, message: str, level: str = "warning"):
"""发送告警通知
可通过注册回调函数来自定义告警方式企业微信钉钉邮件
可通过注册 Reminder 实例来自定义告警方式邮件短信
"""
# 记录到日志
if level == "error":
@@ -771,28 +877,97 @@ class MySQLBinlogListener:
else:
logger.info(f"️ 通知: {message}")
# 这里可以扩展为调用外部告警接口
# 例如:企业微信、钉钉、邮件等
# 通过 Reminder 发送告警
if self._alert_reminder:
try:
alert_content = {
"level": level,
"message": message,
"timestamp": datetime.now().isoformat(),
"source": "binlog_listener"
}
# 在事件循环中异步发送告警
if self._event_loop and self._event_loop.is_running():
asyncio.run_coroutine_threadsafe(
self._alert_reminder.remind(alert_content),
self._event_loop
)
else:
# 如果事件循环未运行,同步执行
import asyncio
asyncio.run(self._alert_reminder.remind(alert_content))
except Exception as e:
logger.error(f"通过 Reminder 发送告警失败: {e}")
def register_alert_handler(self, handler: Callable[[str, str], None]):
"""注册告警处理
def register_alert_handler(self, reminder):
"""注册告警提醒
Args:
handler: 回调函数接收 (message, level) 两个参数
reminder: Reminder 实例需实现 async remind 方法
"""
self._health_checker.register_alert_callback(handler)
logger.info("✅ 告警处理器已注册")
self._alert_reminder = reminder
logger.info("✅ 告警提醒器已注册")
def get_status(self) -> Dict[str, Any]:
"""获取监控状态信息"""
return {
"running": self.running,
"healthy": self._health_checker.is_healthy() if hasattr(self, '_health_checker') else None,
"event_loop_healthy": self._event_loop_health_checker.is_healthy() if hasattr(self, '_event_loop_health_checker') else None,
"current_position": self._current_position,
"consecutive_errors": getattr(self, '_consecutive_errors', 0),
"thread_pool_size": getattr(self._thread_pool, '_max_workers', 'unknown'),
"pending_events": self.get_pending_events_count(),
"backpressure_threshold": self._backpressure_threshold,
"backpressure_percent": round(self.get_pending_events_count() / self._backpressure_threshold * 100, 2),
}
def _increment_pending(self):
"""增加待处理事件计数"""
with self._pending_lock:
self._pending_events += 1
return self._pending_events
def _decrement_pending(self):
"""减少待处理事件计数"""
with self._pending_lock:
if self._pending_events > 0:
self._pending_events -= 1
return self._pending_events
def get_pending_events_count(self):
"""获取当前待处理事件数"""
with self._pending_lock:
return self._pending_events
def _check_backpressure(self):
"""检查背压状态并发送告警"""
pending_count = self.get_pending_events_count()
# 检查是否超过警告阈值
if pending_count > self._backpressure_warning_threshold:
current_time = time.time()
# 检查是否需要发送告警(避免频繁告警)
if current_time - self._last_backpressure_warning > self._backpressure_warning_interval:
warning_msg = f"⚠️ binlog监听背压告警: 待处理事件 {pending_count} 超过阈值 {self._backpressure_warning_threshold}"
logger.warning(warning_msg)
# 通过健康检查器发送告警
if hasattr(self, '_health_checker'):
self._health_checker._send_alert(warning_msg, "warning")
self._last_backpressure_warning = current_time
# 检查是否超过严重阈值
if pending_count > self._backpressure_threshold:
current_time = time.time()
if current_time - self._last_backpressure_warning > self._backpressure_warning_interval:
error_msg = f"❌ binlog监听背压严重: 待处理事件 {pending_count} 超过上限 {self._backpressure_threshold}"
logger.error(error_msg)
if hasattr(self, '_health_checker'):
self._health_checker._send_alert(error_msg, "error")
self._last_backpressure_warning = current_time
return pending_count
def reset_position(self):
"""重置 binlog 位置(下次启动时从头开始)"""
if ENABLE_BINLOG_POSITION and self._position_manager:
@@ -852,6 +1027,12 @@ class MySQLBinlogListener:
except Exception as e:
logger.fail("binlog监听线程池创建", "", str(e))
# 启动健康检查器
self._health_checker.start()
# 启动事件循环健康检查器
self._event_loop_health_checker.start()
# 启动Binlog监控线程
monitoring_thread = threading.Thread(target=self._monitor_binlog_with_retry, daemon=True, name='mysql-monitor-binlog')
monitoring_thread.start()
@@ -1059,12 +1240,19 @@ class MySQLBinlogListener:
return delay * jitter
def _run_async_event(self, event):
"""异步运行事件处理,支持重试机制"""
"""异步运行事件处理,支持重试机制和背压检测"""
# 增加待处理事件计数
self._increment_pending()
# 定期检查背压状态(每100个事件检查一次)
if self.get_pending_events_count() % 100 == 0:
self._check_backpressure()
max_retries = 3
for retry in range(max_retries):
try:
self._thread_pool.submit(self.process_binlog_event, event)
self._thread_pool.submit(self._process_with_counter, event)
return
except Exception as e:
if retry < max_retries - 1:
@@ -1073,8 +1261,18 @@ class MySQLBinlogListener:
time.sleep(delay)
else:
logger.error(f"❌ binlog监听事件处理失败,已达到最大重试次数: {e}")
# 减少待处理计数(因为最终失败了)
self._decrement_pending()
# 添加到DeadLetter队列
self._add_to_dead_letter_queue(event, str(e))
def _process_with_counter(self, event):
"""处理事件并维护待处理计数"""
try:
self.process_binlog_event(event)
finally:
# 无论成功或失败,都减少待处理计数
self._decrement_pending()
def _run_handler(self, handler, *args, **kwargs):
"""运行处理器函数,支持同步和异步函数,带重试机制"""
@@ -1356,40 +1554,79 @@ class MySQLBinlogListener:
return data_str
return str(data)
def stop_monitoring(self):
"""停止监控"""
if self.running:
self.running = False
# 等待一段时间,让当前处理的事件完成
logger.info("⏳ 等待正在处理的事件完成...")
time.sleep(2) # 等待2秒,让当前事件处理完成
# 关闭事件循环
if self._event_loop:
try:
self._event_loop.call_soon_threadsafe(self._event_loop.stop)
if self._loop_thread:
self._loop_thread.join(timeout=5)
logger.success("binlog监听事件循环", "", "已关闭")
except Exception as e:
logger.fail("binlog监听事件循环关闭", "", str(e))
# 关闭线程池
try:
# 等待所有任务完成,但最多等待10秒
self._thread_pool.shutdown(wait=True, cancel_futures=True)
logger.success("binlog监听线程池", f"{self._thread_pool}", "已关闭")
except Exception as e:
logger.fail("binlog监听线程池关闭", f"{self._thread_pool}", str(e))
# 重置状态
self._event_loop = None
self._loop_thread = None
logger.success("binlog监听", f"@{MYAPS_MAIN_DB}", "已停止")
else:
def stop_monitoring(self, graceful_timeout=30):
"""
停止监控优雅停止
Args:
graceful_timeout: 优雅停止最大等待时间
"""
if not self.running:
logger.info("⚠️ binlog监听已经停止")
return
logger.info("🛑 开始停止binlog监听...")
self.running = False
# 1. 停止健康检查器
if hasattr(self, '_health_checker'):
try:
self._health_checker.stop()
logger.info("✅ 健康检查器已停止")
except Exception as e:
logger.warning(f"⚠️ 健康检查器停止失败: {e}")
# 1.5 停止事件循环健康检查器
if hasattr(self, '_event_loop_health_checker'):
try:
self._event_loop_health_checker.stop()
logger.info("✅ 事件循环健康检查器已停止")
except Exception as e:
logger.warning(f"⚠️ 事件循环健康检查器停止失败: {e}")
# 2. 等待待处理事件完成(优雅停止)
pending = self.get_pending_events_count()
if pending > 0:
logger.info(f"⏳ 等待 {pending} 个待处理事件完成...")
start_time = time.time()
while self.get_pending_events_count() > 0:
elapsed = time.time() - start_time
if elapsed > graceful_timeout:
remaining = self.get_pending_events_count()
logger.warning(
f"⚠️ 优雅停止超时 ({graceful_timeout}秒)"
f"仍有 {remaining} 个事件处理中"
)
break
# 短暂等待后再次检查
time.sleep(0.1)
elapsed = time.time() - start_time
logger.info(f"✅ 事件处理等待完成,耗时 {elapsed:.2f}")
# 3. 关闭事件循环
if self._event_loop:
try:
self._event_loop.call_soon_threadsafe(self._event_loop.stop)
if self._loop_thread and self._loop_thread.is_alive():
self._loop_thread.join(timeout=5)
if self._loop_thread.is_alive():
logger.warning("⚠️ 事件循环线程未能正常结束")
logger.info("✅ 事件循环已关闭")
except Exception as e:
logger.warning(f"⚠️ 事件循环关闭失败: {e}")
# 4. 关闭线程池(等待所有任务完成,不取消)
try:
self._thread_pool.shutdown(wait=True, cancel_futures=False)
logger.info("✅ 线程池已关闭")
except Exception as e:
logger.warning(f"⚠️ 线程池关闭失败: {e}")
# 5. 清理状态
self._event_loop = None
self._loop_thread = None
logger.success("binlog监听", f"@{MYAPS_MAIN_DB}", "已完全停止")
@staticmethod
def get_mysql_config(is_single_db=True):
+14 -10
View File
@@ -56,11 +56,23 @@ def get_tplus_conn():
hacyxs_tplus_conn = get_tplus_conn()
#################################################################################
# ⬇️ 项目可复用逻辑
# ⬇️ 通知相关
#################################################################################
planner_email_reminder = QqEmailReminder(
smtp_user="2982212683@qq.com",
smtp_password="jyboujldhplddhdf",
email_from="2982212683@qq.com",
email_to=PLANNER_MAILS,
)
# ⬇️binlog监听告警注册
from apps.data_opt.utils.binlog_listener import binlog_listener as bl
bl.register_alert_handler(planner_email_reminder)
CLIENT_LOGGER.info("binlog监听告警提醒器已注册")
#################################################################################
# ⬇️ 定时任务
@@ -171,14 +183,6 @@ def create_custom_rs_push_model(_aps: ApsPayloadSponsor):
return CustomRsPushModel
planner_email_reminder = QqEmailReminder(
smtp_user="2982212683@qq.com",
smtp_password="jyboujldhplddhdf",
email_from="2982212683@qq.com",
email_to=PLANNER_MAILS,
)
@event_batch_handler(reminder=planner_email_reminder)
@batch_service_operation(module="事件处理")
async def batch_handle_pl_status_a2e(event_data_list: list[dict], _erp: EventResultPoster, description="下达生产加工单至 T+"):
@@ -211,4 +215,4 @@ async def batch_handle_pr_status_a2e(pr_data_list: list[dict], _erp: EventResult
_erp=_erp,
auto_approve=_AUTO_APPROVE_PR,
)
+1 -1
View File
@@ -68,7 +68,7 @@ ENGINEER_MAILS=2982212683@qq.com
# 服务守护配置
########################################################################
# 服务名称,如无必要保持默认值 MyAPS_API 不建议修改
SERVICE_DAEMON_NAME=MyAPS_API
SERVICE_NAME=MyAPS_API
# 启用邮件通知
SERVICE_DAEMON_EMAIL_ENABLED=true
# 邮件接收地址
+4 -4
View File
@@ -58,7 +58,7 @@ pip install -r requirements.txt
- `.env.example`:环境变量配置示例
- 复制为 `.env` 并填写实际配置
- **关键配置项**
- `SERVICE_DAEMON_NAME`Windows 服务名称
- `SERVICE_NAME`Windows 服务名称
- `PROTOCOL`:访问协议(http:// 或 https://
- `PORT`:服务端口
- `PYTHON_VENV_DIR`Python 虚拟环境目录
@@ -99,8 +99,8 @@ pip install -r requirements.txt
```ini
# 服务配置
SERVICE_DAEMON_NAME=MyAPS_API # Windows 服务名称
SERVICE_DAEMON_LOG_DIR=logs # 日志目录
SERVICE_NAME=MyAPS_API # Windows 服务名称
SERVICE_LOG_DIR=logs # 日志目录
# 邮件通知配置
SERVICE_DAEMON_EMAIL_ENABLED=false # 启用邮件通知
@@ -193,7 +193,7 @@ Get-Content d:\code\myaps_fastapi\logs\service_daemon.log -Tail 20
### 5.4 动态配置
- **统一配置源**:所有脚本从 `.env` 读取配置
- **服务名称**:通过 `SERVICE_DAEMON_NAME` 配置
- **服务名称**:通过 `SERVICE_NAME` 配置
- **端口协议**:通过 `PROTOCOL``PORT` 动态拼接健康检查 URL
## 6. 注意事项
+5 -5
View File
@@ -8,16 +8,16 @@ set "PROJECT_ROOT=%SCRIPT_DIR%..\.."
set "ENV_FILE=%PROJECT_ROOT%\.env"
if exist "%ENV_FILE%" (
for /f "tokens=1,2 delims==" %%a in ('findstr /C:"SERVICE_DAEMON_NAME" "%ENV_FILE%"') do (
set "SERVICE_DAEMON_NAME=%%b"
set "SERVICE_DAEMON_NAME=!SERVICE_DAEMON_NAME: =!"
for /f "tokens=1,2 delims==" %%a in ('findstr /C:"SERVICE_NAME" "%ENV_FILE%"') do (
set "SERVICE_NAME=%%b"
set "SERVICE_NAME=!SERVICE_NAME: =!"
)
)
if "%SERVICE_DAEMON_NAME%"=="" (
if "%SERVICE_NAME%"=="" (
set "SERVICE_NAME=MyAPS_API"
) else (
set "SERVICE_NAME=%SERVICE_DAEMON_NAME%"
set "SERVICE_NAME=%SERVICE_NAME%"
)
rem Read PROTOCOL and PORT from .env file
+1 -1
View File
@@ -13,7 +13,7 @@ if exist "%ENV_FILE%" (
set "PYTHON_VENV_DIR=%%b"
set "PYTHON_VENV_DIR=%PYTHON_VENV_DIR: =%"
)
for /f "tokens=1,2 delims==" %%a in ('findstr /C:"SERVICE_DAEMON_NAME" "%ENV_FILE%"') do (
for /f "tokens=1,2 delims==" %%a in ('findstr /C:"SERVICE_NAME" "%ENV_FILE%"') do (
set "SERVICE_NAME=%%b"
set "SERVICE_NAME=%SERVICE_NAME: =%"
)
+1 -1
View File
@@ -64,7 +64,7 @@ if (Test-Path $EnvFile) {
$envVariables = Read-EnvFile -EnvFilePath $EnvFile
# Service configuration
if ($envVariables.ContainsKey("SERVICE_DAEMON_NAME")) { $ServiceName = $envVariables["SERVICE_DAEMON_NAME"] }
if ($envVariables.ContainsKey("SERVICE_NAME")) { $ServiceName = $envVariables["SERVICE_NAME"] }
if ($envVariables.ContainsKey("SERVICE_DAEMON_LOG_DIR")) { $LogDir = $envVariables["SERVICE_DAEMON_LOG_DIR"] }
# Email configuration
+1 -1
View File
@@ -12,7 +12,7 @@ for %%i in ("%SCRIPT_DIR%..") do set "PROJECT_ROOT=%%~fi"
rem Read service name from .env file
if exist "%PROJECT_ROOT%\.env" (
for /f "tokens=1,2 delims==" %%a in ('findstr "^SERVICE_DAEMON_NAME=" "%PROJECT_ROOT%\.env"') do (
for /f "tokens=1,2 delims==" %%a in ('findstr "^SERVICE_NAME=" "%PROJECT_ROOT%\.env"') do (
set "SERVICE_NAME=%%b"
)
)
+56
View File
@@ -2731,6 +2731,62 @@ td {
color: var(--text-primary);
}
/* 背压状态和事件循环状态高亮样式 */
#backpressure-status,
#event-loop-health {
padding: 4px 12px;
border-radius: 20px;
font-size: 14px;
font-weight: 600;
text-transform: uppercase;
letter-spacing: 0.5px;
}
#backpressure-status.healthy,
#event-loop-health.healthy {
background-color: rgba(34, 197, 94, 0.15);
color: var(--success-color);
border: 1px solid rgba(34, 197, 94, 0.3);
}
#backpressure-status.warning,
#event-loop-health.warning {
background-color: rgba(251, 191, 36, 0.15);
color: var(--warning-color);
border: 1px solid rgba(251, 191, 36, 0.3);
animation: pulse-warning 2s infinite;
}
#backpressure-status.error,
#event-loop-health.error {
background-color: rgba(239, 68, 68, 0.15);
color: var(--error-color);
border: 1px solid rgba(239, 68, 68, 0.3);
animation: pulse-error 1s infinite;
}
@keyframes pulse-warning {
0%, 100% {
opacity: 1;
box-shadow: 0 0 0 0 rgba(251, 191, 36, 0.4);
}
50% {
opacity: 0.8;
box-shadow: 0 0 0 6px rgba(251, 191, 36, 0);
}
}
@keyframes pulse-error {
0%, 100% {
opacity: 1;
box-shadow: 0 0 0 0 rgba(239, 68, 68, 0.4);
}
50% {
opacity: 0.7;
box-shadow: 0 0 0 8px rgba(239, 68, 68, 0);
}
}
/* 执行状态标签样式 */
.execution-status {
display: inline-block;
+18
View File
@@ -520,6 +520,24 @@
<span class="summary-label">活跃事件类型</span>
<span class="summary-value" id="events-active-types">--</span>
</div>
<!-- 背压监控 -->
<div class="summary-item" id="backpressure-summary" style="display: none;">
<span class="summary-label">背压状态</span>
<span class="summary-value" id="backpressure-status">正常</span>
</div>
<div class="summary-item" id="backpressure-pending" style="display: none;">
<span class="summary-label">待处理事件</span>
<span class="summary-value" id="backpressure-pending-count">--</span>
</div>
<div class="summary-item" id="backpressure-percent" style="display: none;">
<span class="summary-label">背压使用率</span>
<span class="summary-value" id="backpressure-usage">--%</span>
</div>
<!-- 事件循环健康状态 -->
<div class="summary-item" id="event-loop-status" style="display: none;">
<span class="summary-label">事件循环状态</span>
<span class="summary-value" id="event-loop-health">正常</span>
</div>
</div>
<div class="events-table-container">
<table class="events-table" id="events-table">
+68 -1
View File
@@ -693,7 +693,8 @@ async function refreshAll() {
await Promise.all([
fetchEventStats(),
fetchEventHelpers(),
fetchDeadLetterStats()
fetchDeadLetterStats(),
fetchBinlogListenerStatus()
]);
break;
}
@@ -1865,6 +1866,7 @@ function switchPage(pageName) {
fetchEventStats();
fetchEventHelpers();
fetchDeadLetterStats();
fetchBinlogListenerStatus();
} else if (pageName === 'logs') {
fetchLogsPage();
}
@@ -1889,6 +1891,23 @@ async function fetchEventStats() {
}
}
// 获取binlog listener状态
async function fetchBinlogListenerStatus() {
// 如果用户不活动,不发送请求
if (isInactive) {
console.log('用户不活动,跳过binlog listener状态获取');
return;
}
try {
const response = await fetch(`${API_BASE}/binlog-listener`);
const data = await response.json();
updateBinlogListenerDisplay(data);
} catch (error) {
console.error('获取binlog listener状态失败:', error);
}
}
// 获取事件辅助模块数据
async function fetchEventHelpers() {
// 如果用户不活动,不发送请求
@@ -1957,6 +1976,54 @@ function updateEventStatsDisplay(data) {
}).join('');
}
// 更新binlog listener状态显示
function updateBinlogListenerDisplay(data) {
const listener = data.binlog_listener || {};
// 显示背压监控信息
document.getElementById('backpressure-summary').style.display = 'block';
document.getElementById('backpressure-pending').style.display = 'block';
document.getElementById('backpressure-percent').style.display = 'block';
document.getElementById('event-loop-status').style.display = 'block';
const pendingCount = listener.pending_events || 0;
const threshold = listener.backpressure_threshold || 10000;
const percent = listener.backpressure_percent || 0;
document.getElementById('backpressure-pending-count').textContent = pendingCount;
document.getElementById('backpressure-usage').textContent = percent.toFixed(1) + '%';
// 根据背压状态设置颜色
let statusText = '正常';
let statusClass = 'healthy';
if (percent >= 100) {
statusText = '严重';
statusClass = 'error';
} else if (percent >= 75) {
statusText = '警告';
statusClass = 'warning';
}
const statusEl = document.getElementById('backpressure-status');
statusEl.textContent = statusText;
statusEl.className = 'summary-value ' + statusClass;
// 更新事件循环健康状态
const eventLoopHealthy = listener.event_loop_healthy;
const eventLoopEl = document.getElementById('event-loop-health');
if (eventLoopHealthy === true) {
eventLoopEl.textContent = '正常';
eventLoopEl.className = 'summary-value healthy';
} else if (eventLoopHealthy === false) {
eventLoopEl.textContent = '异常';
eventLoopEl.className = 'summary-value error';
} else {
eventLoopEl.textContent = '未知';
eventLoopEl.className = 'summary-value';
}
}
// 获取事件状态
function getEventStatus(stats) {
const pending = stats.pending_count || 0;