收发请求数据持久化

This commit is contained in:
2026-04-16 22:36:05 +08:00
parent 7016d62b38
commit 64d3edbee2
14 changed files with 1236 additions and 164 deletions
@@ -7,6 +7,7 @@ HTTP 指标采集器
from typing import Dict, Any, List
from ..middleware import http_metrics_collector
from ..allert import AlertType, alert_sender
from ..storage import request_storage
class HTTPCollector:
@@ -16,16 +17,64 @@ class HTTPCollector:
self._collector = http_metrics_collector
def get_metrics(self) -> Dict[str, Any]:
async def get_metrics(self) -> Dict[str, Any]:
"""
获取 HTTP 指标
Returns:
Dict: HTTP 请求指标
"""
# 保存所有请求到数据库
await self.save_all_requests()
return self._collector.get_metrics()
async def save_all_requests(self):
"""
保存所有请求到数据库
"""
from globalobjects import logger as log_config
logger = log_config.get_logger(__name__)
requests = list(self._collector._requests)
logger.info(f"开始保存请求数据,共 {len(requests)} 个请求")
saved_count = 0
for req in requests:
try:
# 检查是否已经保存过(通过时间戳和路径判断)
existing = await request_storage.get_request_by_timestamp_and_path(
req.get("timestamp"),
req.get("path")
)
if not existing:
# 保存请求数据
from datetime import datetime
request_data = {
"timestamp": datetime.fromtimestamp(req.get("timestamp")),
"method": req.get("method"),
"path": req.get("path"),
"query_params": req.get("query_params"),
"status_code": req.get("status_code"),
"response_time": req.get("duration") * 1000, # 转换为毫秒
"client_ip": req.get("client_ip"),
"user_agent": req.get("user_agent"),
"payload_size": len(req.get("request_body", "")) if req.get("request_body") else None,
"response_size": len(req.get("response_body", "")) if req.get("response_body") else None,
"request_body": req.get("request_body"),
"response_body": req.get("response_body"),
"is_slow": req.get("is_slow", False),
"slow_threshold": 1000.0 if req.get("is_slow") else None,
"is_error": req.get("is_error", False),
"error_message": req.get("error_message")
}
await request_storage.save_request(request_data)
saved_count += 1
except Exception as e:
logger.error(f"保存请求数据失败: {e}")
logger.info(f"请求数据保存完成,共保存 {saved_count} 个请求")
async def get_slow_requests(self, limit: int = 10) -> List[Dict[str, Any]]:
"""
获取慢请求列表
@@ -37,6 +86,30 @@ class HTTPCollector:
List: 慢请求列表
"""
slow_requests = self._collector.get_slow_requests(limit)
# 持久化慢请求数据
from datetime import datetime
for req in slow_requests:
# 保存基础请求数据和慢请求字段
request_data = {
"timestamp": datetime.fromtimestamp(req.get("timestamp")),
"method": req.get("method"),
"path": req.get("path"),
"query_params": req.get("query_params"),
"status_code": req.get("status_code"),
"response_time": req.get("duration") * 1000, # 转换为毫秒
"client_ip": req.get("client_ip"),
"user_agent": req.get("user_agent"),
"payload_size": len(req.get("request_body", "")) if req.get("request_body") else None,
"response_size": len(req.get("response_body", "")) if req.get("response_body") else None,
"request_body": req.get("request_body"),
"response_body": req.get("response_body"),
"is_slow": True,
"slow_threshold": 1000.0,
"is_error": False
}
await request_storage.save_request(request_data)
await alert_sender.trigger_alert(AlertType.REQUEST_SLOW, slow_requests)
return slow_requests
@@ -52,6 +125,30 @@ class HTTPCollector:
List: 错误请求列表
"""
error_requests = self._collector.get_error_requests(limit)
# 持久化错误请求数据
from datetime import datetime
for req in error_requests:
# 保存基础请求数据和错误请求字段
request_data = {
"timestamp": datetime.fromtimestamp(req.get("timestamp")),
"method": req.get("method"),
"path": req.get("path"),
"query_params": req.get("query_params"),
"status_code": req.get("status_code"),
"response_time": req.get("duration") * 1000, # 转换为毫秒
"client_ip": req.get("client_ip"),
"user_agent": req.get("user_agent"),
"payload_size": len(req.get("request_body", "")) if req.get("request_body") else None,
"response_size": len(req.get("response_body", "")) if req.get("response_body") else None,
"request_body": req.get("request_body"),
"response_body": req.get("response_body"),
"is_error": True,
"error_message": req.get("error_message"),
"is_slow": False
}
await request_storage.save_request(request_data)
await alert_sender.trigger_alert(AlertType.REQUEST_ERROR, error_requests)
return error_requests
@@ -59,3 +156,102 @@ class HTTPCollector:
def reset_stats(self):
"""重置统计"""
self._collector.reset_stats()
async def get_requests_by_date(self, date: str, limit: int = 1000) -> List[Dict[str, Any]]:
"""
按日期获取请求记录
Args:
date: 查询日期,格式:YYYY-MM-DD
limit: 返回数量限制
Returns:
List: 请求记录列表
"""
requests = await request_storage.get_requests_by_date(date, limit)
return [{
"id": req.id,
"timestamp": req.timestamp.isoformat(),
"method": req.method,
"path": req.path,
"query_params": req.query_params,
"status_code": req.status_code,
"response_time": req.response_time,
"client_ip": req.client_ip,
"user_agent": req.user_agent,
"payload_size": req.payload_size,
"response_size": req.response_size,
"request_body": req.request_body,
"response_body": req.response_body,
"is_slow": req.is_slow,
"slow_threshold": req.slow_threshold,
"is_error": req.is_error,
"error_message": req.error_message
} for req in requests]
async def get_slow_requests_by_date(self, date: str, limit: int = 100) -> List[Dict[str, Any]]:
"""
按日期获取慢请求记录
Args:
date: 查询日期,格式:YYYY-MM-DD
limit: 返回数量限制
Returns:
List: 慢请求记录列表
"""
slow_requests = await request_storage.get_slow_requests_by_date(date, limit)
return [{
"id": req.id,
"timestamp": req.timestamp.isoformat(),
"method": req.method,
"path": req.path,
"query_params": req.query_params,
"status_code": req.status_code,
"response_time": req.response_time,
"client_ip": req.client_ip,
"user_agent": req.user_agent,
"payload_size": req.payload_size,
"response_size": req.response_size,
"request_body": req.request_body,
"response_body": req.response_body,
"is_slow": req.is_slow,
"slow_threshold": req.slow_threshold,
"is_error": req.is_error,
"error_message": req.error_message
} for req in slow_requests]
async def get_error_requests_by_date(self, date: str, limit: int = 100) -> List[Dict[str, Any]]:
"""
按日期获取错误请求记录
Args:
date: 查询日期,格式:YYYY-MM-DD
limit: 返回数量限制
Returns:
List: 错误请求记录列表
"""
error_requests = await request_storage.get_error_requests_by_date(date, limit)
return [{
"id": req.id,
"timestamp": req.timestamp.isoformat(),
"method": req.method,
"path": req.path,
"query_params": req.query_params,
"status_code": req.status_code,
"response_time": req.response_time,
"client_ip": req.client_ip,
"user_agent": req.user_agent,
"payload_size": req.payload_size,
"response_size": req.response_size,
"request_body": req.request_body,
"response_body": req.response_body,
"is_slow": req.is_slow,
"slow_threshold": req.slow_threshold,
"is_error": req.is_error,
"error_message": req.error_message
} for req in error_requests]
@@ -10,6 +10,8 @@ import json
import threading
from typing import Dict, Any, List
from collections import deque, defaultdict
from datetime import datetime
from ..storage import outbound_request_storage
class OutboundHTTPCollector:
@@ -119,7 +121,7 @@ class OutboundHTTPCollector:
"response_body": response_body,
"error_message": error_message,
"module": module,
"is_error": status_code >= 400 or error_message,
"is_error": bool(status_code >= 400 or error_message),
"is_slow": duration >= self._slow_threshold,
}
@@ -183,6 +185,28 @@ class OutboundHTTPCollector:
status_codes_to_remove.append(code)
for code in status_codes_to_remove:
del self._stats["status_codes"][code]
# 持久化到数据库
try:
request_data = {
"timestamp": datetime.fromtimestamp(request_info["timestamp"]),
"method": method,
"url": url,
"status_code": status_code,
"duration": duration,
"request_headers": json.dumps(request_headers) if request_headers else None,
"request_body": request_body,
"response_headers": json.dumps(response_headers) if response_headers else None,
"response_body": response_body,
"error_message": error_message,
"module": module,
"is_error": request_info["is_error"],
"is_slow": request_info["is_slow"],
}
await outbound_request_storage.save_request(request_data)
except Exception as e:
# 记录异常,确保即使发生异常也不会影响主流程
print(f"保存对外请求到数据库失败: {e}")
def record_request_sync(
self,
@@ -243,7 +267,7 @@ class OutboundHTTPCollector:
"response_body": response_body,
"error_message": error_message,
"module": module,
"is_error": status_code >= 400 or error_message,
"is_error": bool(status_code >= 400 or error_message),
"is_slow": duration >= self._slow_threshold,
}
@@ -318,6 +342,58 @@ class OutboundHTTPCollector:
except Exception as e:
# 记录异常,确保即使发生异常也不会影响主流程
print(f"更新统计信息时发生异常: {e}")
# 异步保存到数据库,避免阻塞同步线程
try:
import asyncio
from datetime import datetime
# 准备保存到数据库的数据
request_data = {
"timestamp": datetime.fromtimestamp(request_info["timestamp"]),
"method": method,
"url": url,
"status_code": status_code,
"duration": duration,
"request_headers": json.dumps(request_headers) if request_headers else None,
"request_body": request_body,
"response_headers": json.dumps(response_headers) if response_headers else None,
"response_body": response_body,
"error_message": error_message,
"module": module,
"is_error": request_info["is_error"],
"is_slow": request_info["is_slow"],
}
# 直接使用同步方式保存到数据库,避免事件循环问题
try:
from tortoise import Tortoise
from core.database import TORTOISE_ORM_CONFIG
# 初始化Tortoise ORM
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async def save_to_db():
try:
# 检查Tortoise是否已经初始化
if not Tortoise._inited:
await Tortoise.init(config=TORTOISE_ORM_CONFIG)
await outbound_request_storage.save_request(request_data)
except Exception as e:
print(f"保存对外请求到数据库失败: {e}")
finally:
# 不要关闭数据库连接,避免影响其他线程
pass
# 运行事件循环直到任务完成
loop.run_until_complete(save_to_db())
except Exception as e:
# 记录异常,确保即使发生异常也不会影响主流程
print(f"同步保存对外请求到数据库失败: {e}")
except Exception as e:
# 记录异常,确保即使发生异常也不会影响主流程
print(f"创建数据库保存任务失败: {e}")
def get_metrics(self) -> Dict[str, Any]:
"""
@@ -454,6 +530,105 @@ class OutboundHTTPCollector:
"errors": 0,
}),
}
async def get_requests_by_date(self, date: str, limit: int = 1000) -> List[Dict[str, Any]]:
"""
按日期获取对外请求记录
Args:
date: 日期字符串 (YYYY-MM-DD)
limit: 返回数量限制
Returns:
对外请求记录列表
"""
requests = await outbound_request_storage.get_requests_by_date(date, limit)
# 转换为字典格式
result = []
for req in requests:
result.append({
"id": req.id,
"timestamp": req.timestamp.isoformat(),
"method": req.method,
"url": req.url,
"status_code": req.status_code,
"duration": req.duration,
"request_headers": req.request_headers,
"request_body": req.request_body,
"response_headers": req.response_headers,
"response_body": req.response_body,
"error_message": req.error_message,
"module": req.module,
"is_error": req.is_error,
"is_slow": req.is_slow,
})
return result
async def get_slow_requests_by_date(self, date: str, limit: int = 100) -> List[Dict[str, Any]]:
"""
按日期获取对外慢请求记录
Args:
date: 日期字符串 (YYYY-MM-DD)
limit: 返回数量限制
Returns:
对外慢请求记录列表
"""
slow_requests = await outbound_request_storage.get_slow_requests_by_date(date, limit)
# 转换为字典格式
result = []
for req in slow_requests:
result.append({
"id": req.id,
"timestamp": req.timestamp.isoformat(),
"method": req.method,
"url": req.url,
"status_code": req.status_code,
"duration": req.duration,
"request_headers": req.request_headers,
"request_body": req.request_body,
"response_headers": req.response_headers,
"response_body": req.response_body,
"error_message": req.error_message,
"module": req.module,
"is_error": req.is_error,
"is_slow": req.is_slow,
})
return result
async def get_error_requests_by_date(self, date: str, limit: int = 100) -> List[Dict[str, Any]]:
"""
按日期获取对外错误请求记录
Args:
date: 日期字符串 (YYYY-MM-DD)
limit: 返回数量限制
Returns:
对外错误请求记录列表
"""
error_requests = await outbound_request_storage.get_error_requests_by_date(date, limit)
# 转换为字典格式
result = []
for req in error_requests:
result.append({
"id": req.id,
"timestamp": req.timestamp.isoformat(),
"method": req.method,
"url": req.url,
"status_code": req.status_code,
"duration": req.duration,
"request_headers": req.request_headers,
"request_body": req.request_body,
"response_headers": req.response_headers,
"response_body": req.response_body,
"error_message": req.error_message,
"module": req.module,
"is_error": req.is_error,
"is_slow": req.is_slow,
})
return result
# 全局对外 HTTP 请求收集器实例
+1 -1
View File
@@ -223,7 +223,7 @@ class HTTPMonitorMiddleware(BaseHTTPMiddleware):
def __init__(self, app, include_paths: Optional[List[str]] = None):
super().__init__(app)
self.include_paths = include_paths or [
"/api", # 监控 API 路径的请求
"/api", # 监控 API 路径的请求
]
async def dispatch(self, request: Request, call_next: Callable) -> Response:
+63
View File
@@ -0,0 +1,63 @@
from tortoise import fields
from tortoise.models import Model
from datetime import datetime
class APIRequest(Model):
"""API 请求记录模型"""
id = fields.IntField(pk=True, auto_generate=True)
timestamp = fields.DatetimeField(default=datetime.utcnow, description="请求时间")
method = fields.CharField(max_length=10, description="HTTP 方法")
path = fields.CharField(max_length=512, description="请求路径")
query_params = fields.TextField(null=True, description="查询参数")
status_code = fields.IntField(description="响应状态码")
response_time = fields.FloatField(description="响应时间(毫秒)")
client_ip = fields.CharField(max_length=64, null=True, description="客户端 IP")
user_agent = fields.TextField(null=True, description="用户代理")
payload_size = fields.IntField(null=True, description="请求体大小")
response_size = fields.IntField(null=True, description="响应体大小")
request_body = fields.TextField(null=True, description="请求体")
response_body = fields.TextField(null=True, description="响应体")
is_slow = fields.BooleanField(default=False, description="是否慢请求")
slow_threshold = fields.FloatField(null=True, description="慢请求阈值(毫秒)")
is_error = fields.BooleanField(default=False, description="是否错误请求")
error_message = fields.TextField(null=True, description="错误信息")
class Meta:
table = "api_requests"
indexes = [
("timestamp",),
("path",),
("status_code",),
("response_time",),
("is_slow",),
("is_error",),
]
class OutboundAPIRequest(Model):
"""对外 HTTP 请求记录模型"""
id = fields.IntField(pk=True, auto_generate=True)
timestamp = fields.DatetimeField(default=datetime.utcnow, description="请求时间")
method = fields.CharField(max_length=10, description="HTTP 方法")
url = fields.TextField(description="请求 URL")
status_code = fields.IntField(description="响应状态码")
duration = fields.FloatField(description="响应时间(秒)")
request_headers = fields.TextField(null=True, description="请求头")
request_body = fields.TextField(null=True, description="请求体")
response_headers = fields.TextField(null=True, description="响应头")
response_body = fields.TextField(null=True, description="响应体")
error_message = fields.TextField(null=True, description="错误信息")
module = fields.CharField(max_length=255, null=True, description="发起请求的模块")
is_error = fields.BooleanField(default=False, description="是否错误请求")
is_slow = fields.BooleanField(default=False, description="是否慢请求")
class Meta:
table = "outbound_api_requests"
indexes = [
("timestamp",),
("module",),
("status_code",),
("is_error",),
("is_slow",),
]
+110 -3
View File
@@ -116,7 +116,7 @@ async def get_http_metrics():
返回 HTTP 请求统计、状态码分布、路径统计等信息
"""
return monitor_service.get_http_metrics()
return await monitor_service.get_http_metrics()
@router.get("/http/slow")
@@ -130,7 +130,7 @@ async def get_slow_requests(limit: int = 10):
Returns:
慢请求列表
"""
return {"slow_requests": monitor_service.http_collector.get_slow_requests(limit)}
return {"slow_requests": await monitor_service.http_collector.get_slow_requests(limit)}
@router.get("/http/errors")
@@ -144,7 +144,7 @@ async def get_error_requests(limit: int = 10):
Returns:
错误请求列表
"""
return {"error_requests": monitor_service.http_collector.get_error_requests(limit)}
return {"error_requests": await monitor_service.http_collector.get_error_requests(limit)}
@router.post("/http/reset")
@@ -159,6 +159,113 @@ async def reset_http_stats():
return {"message": "HTTP 统计已重置"}
@router.get("/http/requests")
async def get_requests_by_date(date: str, limit: int = 1000):
"""
按日期获取请求记录
Args:
date: 查询日期,格式:YYYY-MM-DD
limit: 返回数量限制
Returns:
请求记录列表
"""
requests = await monitor_service.http_collector.get_requests_by_date(date, limit)
return {"requests": requests, "count": len(requests), "date": date}
@router.get("/http/slow/date")
async def get_slow_requests_by_date(date: str, limit: int = 100):
"""
按日期获取慢请求记录
Args:
date: 查询日期,格式:YYYY-MM-DD
limit: 返回数量限制
Returns:
慢请求记录列表
"""
slow_requests = await monitor_service.http_collector.get_slow_requests_by_date(date, limit)
return {"slow_requests": slow_requests, "count": len(slow_requests), "date": date}
@router.get("/http/errors/date")
async def get_error_requests_by_date(date: str, limit: int = 100):
"""
按日期获取错误请求记录
Args:
date: 查询日期,格式:YYYY-MM-DD
limit: 返回数量限制
Returns:
错误请求记录列表
"""
error_requests = await monitor_service.http_collector.get_error_requests_by_date(date, limit)
return {"error_requests": error_requests, "count": len(error_requests), "date": date}
# 对外 HTTP 请求端点
@router.get("/outbound-http")
async def get_outbound_http_metrics():
"""
获取对外请求指标
返回对外 HTTP 请求统计、状态码分布、URL 统计等信息
"""
return monitor_service.get_outbound_http_metrics()
@router.get("/outbound-http/requests")
async def get_outbound_requests_by_date(date: str, limit: int = 1000):
"""
按日期获取对外请求记录
Args:
date: 查询日期,格式:YYYY-MM-DD
limit: 返回数量限制
Returns:
对外请求记录列表
"""
requests = await monitor_service.outbound_http_collector.get_requests_by_date(date, limit)
return {"requests": requests, "count": len(requests), "date": date}
@router.get("/outbound-http/slow/date")
async def get_outbound_slow_requests_by_date(date: str, limit: int = 100):
"""
按日期获取对外慢请求记录
Args:
date: 查询日期,格式:YYYY-MM-DD
limit: 返回数量限制
Returns:
对外慢请求记录列表
"""
slow_requests = await monitor_service.outbound_http_collector.get_slow_requests_by_date(date, limit)
return {"slow_requests": slow_requests, "count": len(slow_requests), "date": date}
@router.get("/outbound-http/errors/date")
async def get_outbound_error_requests_by_date(date: str, limit: int = 100):
"""
按日期获取对外错误请求记录
Args:
date: 查询日期,格式:YYYY-MM-DD
limit: 返回数量限制
Returns:
对外错误请求记录列表
"""
error_requests = await monitor_service.outbound_http_collector.get_error_requests_by_date(date, limit)
return {"error_requests": error_requests, "count": len(error_requests), "date": date}
@router.get("/logs")
async def get_recent_logs(limit: int = 50, level: str = None):
"""
+7 -6
View File
@@ -181,7 +181,7 @@ class MonitorService:
self._set_cache_data("scheduler", metrics)
return metrics
def get_http_metrics(self) -> Dict[str, Any]:
async def get_http_metrics(self) -> Dict[str, Any]:
"""获取 HTTP 指标"""
# 尝试从缓存获取
cached = self._get_cached_data("http")
@@ -189,7 +189,7 @@ class MonitorService:
return cached
# 缓存过期,重新采集
metrics = self.http_collector.get_metrics()
metrics = await self.http_collector.get_metrics()
# 设置缓存
self._set_cache_data("http", metrics)
@@ -238,7 +238,7 @@ class MonitorService:
"resource": self.get_resource_metrics(),
"database": await self.get_database_metrics(),
"scheduler": self.get_scheduler_metrics(),
"http": self.get_http_metrics(),
"http": await self.get_http_metrics(),
"outbound_http": self.get_outbound_http_metrics(),
"alerts": self.get_recent_alerts(10),
}
@@ -296,7 +296,7 @@ class MonitorService:
# 检查 HTTP
try:
http_metrics = self.http_collector.get_metrics()
http_metrics = await self.get_http_metrics()
summary = http_metrics.get("summary", {})
error_rate = summary.get("error_rate", 0)
if error_rate < 5:
@@ -679,6 +679,7 @@ class MonitorService:
async def _broadcast_loop(self):
"""广播循环,定期发送监控数据"""
from datetime import datetime
try:
while self._broadcast_running:
# 收集监控数据
@@ -689,10 +690,10 @@ class MonitorService:
logs = self.get_recent_logs(limit=100)
# 收集 API 请求数据
api_requests = self.http_collector.get_metrics()
api_requests = await self.http_collector.get_requests_by_date(datetime.now().strftime('%Y-%m-%d'))
# 收集发送请求数据
outbound_requests = self.get_outbound_http_metrics()
outbound_requests = await self.outbound_http_collector.get_requests_by_date(datetime.now().strftime('%Y-%m-%d'))
data = {
"type": "monitor_data",
+182
View File
@@ -0,0 +1,182 @@
from typing import List, Dict, Any
from datetime import datetime
from .models import APIRequest, OutboundAPIRequest
class RequestStorage:
"""请求数据存储服务"""
async def save_request(self, request_data: Dict[str, Any]) -> APIRequest:
"""保存单个请求数据"""
request = await APIRequest.create(**request_data)
return request
async def save_requests(self, requests_data: List[Dict[str, Any]]) -> List[APIRequest]:
"""批量保存请求数据"""
if not requests_data:
return []
requests = await APIRequest.bulk_create(
[APIRequest(**data) for data in requests_data]
)
return requests
async def get_request_by_timestamp_and_path(self, timestamp: float, path: str) -> APIRequest:
"""通过时间戳和路径获取请求记录"""
from datetime import datetime, timedelta
# 转换时间戳为 datetime
timestamp_dt = datetime.fromtimestamp(timestamp)
# 使用时间范围查询,避免时间戳精度问题
return await APIRequest.filter(
timestamp__gte=timestamp_dt - timedelta(seconds=0.1),
timestamp__lte=timestamp_dt + timedelta(seconds=0.1),
path=path
).first()
async def get_requests_by_date(self, date: str, limit: int = 1000) -> List[APIRequest]:
"""按日期获取请求记录"""
try:
date_obj = datetime.strptime(date, '%Y-%m-%d')
next_day = date_obj.replace(day=date_obj.day + 1)
requests = await APIRequest.filter(
timestamp__gte=date_obj,
timestamp__lt=next_day
).limit(limit).order_by('-timestamp').all()
return requests
except Exception as e:
print(f"获取请求数据失败: {e}")
return []
async def get_slow_requests_by_date(self, date: str, limit: int = 100) -> List[APIRequest]:
"""按日期获取慢请求记录"""
try:
date_obj = datetime.strptime(date, '%Y-%m-%d')
next_day = date_obj.replace(day=date_obj.day + 1)
slow_requests = await APIRequest.filter(
is_slow=True,
timestamp__gte=date_obj,
timestamp__lt=next_day
).limit(limit).order_by('-response_time').all()
return slow_requests
except Exception as e:
print(f"获取慢请求数据失败: {e}")
return []
async def get_error_requests_by_date(self, date: str, limit: int = 100) -> List[APIRequest]:
"""按日期获取错误请求记录"""
try:
date_obj = datetime.strptime(date, '%Y-%m-%d')
next_day = date_obj.replace(day=date_obj.day + 1)
error_requests = await APIRequest.filter(
is_error=True,
timestamp__gte=date_obj,
timestamp__lt=next_day
).limit(limit).order_by('-timestamp').all()
return error_requests
except Exception as e:
print(f"获取错误请求数据失败: {e}")
return []
async def get_slow_requests_by_threshold(self, threshold: float, limit: int = 100) -> List[APIRequest]:
"""按阈值获取慢请求记录"""
slow_requests = await APIRequest.filter(
is_slow=True,
slow_threshold__gte=threshold
).limit(limit).order_by('-slow_threshold').all()
return slow_requests
async def get_error_requests_by_status(self, status_code: int, limit: int = 100) -> List[APIRequest]:
"""按状态码获取错误请求记录"""
error_requests = await APIRequest.filter(
is_error=True,
status_code=status_code
).limit(limit).order_by('-timestamp').all()
return error_requests
async def clean_old_data(self, days: int = 7):
"""清理指定天数前的数据"""
from datetime import timedelta
cutoff_date = datetime.utcnow() - timedelta(days=days)
# 直接删除主表数据
await APIRequest.filter(timestamp__lt=cutoff_date).delete()
class OutboundRequestStorage:
"""对外请求数据存储服务"""
async def save_request(self, request_data: Dict[str, Any]) -> OutboundAPIRequest:
"""保存单个对外请求数据"""
request = await OutboundAPIRequest.create(**request_data)
return request
async def save_requests(self, requests_data: List[Dict[str, Any]]) -> List[OutboundAPIRequest]:
"""批量保存对外请求数据"""
if not requests_data:
return []
requests = await OutboundAPIRequest.bulk_create(
[OutboundAPIRequest(**data) for data in requests_data]
)
return requests
async def get_requests_by_date(self, date: str, limit: int = 1000) -> List[OutboundAPIRequest]:
"""按日期获取对外请求记录"""
try:
date_obj = datetime.strptime(date, '%Y-%m-%d')
next_day = date_obj.replace(day=date_obj.day + 1)
requests = await OutboundAPIRequest.filter(
timestamp__gte=date_obj,
timestamp__lt=next_day
).limit(limit).order_by('-timestamp').all()
return requests
except Exception as e:
print(f"获取对外请求数据失败: {e}")
return []
async def get_slow_requests_by_date(self, date: str, limit: int = 100) -> List[OutboundAPIRequest]:
"""按日期获取对外慢请求记录"""
try:
date_obj = datetime.strptime(date, '%Y-%m-%d')
next_day = date_obj.replace(day=date_obj.day + 1)
requests = await OutboundAPIRequest.filter(
is_slow=True,
timestamp__gte=date_obj,
timestamp__lt=next_day
).limit(limit).order_by('-duration').all()
return requests
except Exception as e:
print(f"获取对外慢请求数据失败: {e}")
return []
async def get_error_requests_by_date(self, date: str, limit: int = 100) -> List[OutboundAPIRequest]:
"""按日期获取对外错误请求记录"""
try:
date_obj = datetime.strptime(date, '%Y-%m-%d')
next_day = date_obj.replace(day=date_obj.day + 1)
requests = await OutboundAPIRequest.filter(
is_error=True,
timestamp__gte=date_obj,
timestamp__lt=next_day
).limit(limit).order_by('-timestamp').all()
return requests
except Exception as e:
print(f"获取对外错误请求数据失败: {e}")
return []
async def get_requests_by_module(self, module: str, limit: int = 100) -> List[OutboundAPIRequest]:
"""按模块获取对外请求记录"""
requests = await OutboundAPIRequest.filter(
module=module
).limit(limit).order_by('-timestamp').all()
return requests
async def clean_old_data(self, days: int = 7):
"""清理指定天数前的对外请求数据"""
from datetime import timedelta
cutoff_date = datetime.utcnow() - timedelta(days=days)
await OutboundAPIRequest.filter(timestamp__lt=cutoff_date).delete()
# 全局存储实例
request_storage = RequestStorage()
outbound_request_storage = OutboundRequestStorage()
+99 -1
View File
@@ -6,10 +6,108 @@ from typing import Dict, Any, Optional
from datetime import datetime, timedelta
from tortoise.contrib.fastapi import register_tortoise
from core.settings import TORTOISE_ORM_CONFIG, MYAPS_MAIN_DB, MYAPS_DBSET_LIST
from core.settings import (
BASE_DIR,
MYAPS_MAIN_DB, MYAPS_DBSET_LIST, MYAPS_DB_HOST, MYAPS_DB_PORT, MYAPS_DB_USER, MYAPS_DB_PASSWORD,
THIS_DB_NAME, THIS_DB_HOST, THIS_DB_PORT, THIS_DB_USER, THIS_DB_PASSWORD
)
from globalobjects import logger as log_config
######################################################################################
# 计算连接池大小:根据账套数量动态调整,避免连接总数过多
# 总连接数 = 账套数 × maxsize,应控制在合理范围内(建议不超过150)
import os
cpu_count = os.cpu_count() or 4
db_count = len(MYAPS_DBSET_LIST)
# 动态计算连接池大小:
# - 单账套:maxsize=15
# - 多账套:根据账套数量递减,最小为3
# - 确保总连接数不超过 30(考虑到服务器限制非常严格)
maxsize_per_db = min(15, max(3, 30 // max(db_count, 1)))
minsize_per_db = min(2, maxsize_per_db // 2)
# logger.info(f"数据库连接池配置:{db_count}个账套,每个账套minsize={minsize_per_db}, maxsize={maxsize_per_db}")
# 数据库配置
connections = {
"local_data": {
"engine": "tortoise.backends.sqlite",
"credentials": {
"file_path": BASE_DIR / "local_data.sqlite3", # 统一管理数据文件
"journal_mode": "WAL", # 写前日志,提升并发性能
"synchronous": "NORMAL", # 性能与安全的平衡
"cache_size": -100000, # 100MB 内存缓存
"foreign_keys": True, # 启用外键约束
"timeout": 30, # 连接超时时间
"check_same_thread": False,
},
"maxsize": 5, # 最大连接数
"minsize": 1, # 最小连接数
}
}
# 为每个账套创建MySQL连接配置
for db in MYAPS_DBSET_LIST:
connections[db] = {
"engine": "tortoise.backends.mysql",
"credentials": {
"host": MYAPS_DB_HOST,
"port": MYAPS_DB_PORT,
"user": MYAPS_DB_USER,
"password": MYAPS_DB_PASSWORD,
"database": db,
"charset": "utf8mb4",
"connect_timeout": 30, # 减少连接超时时间到30秒
"minsize": minsize_per_db, # 根据账套数量动态调整最小连接数
"maxsize": maxsize_per_db, # 根据账套数量动态调整最大连接数
"ssl": None,
"echo": False,
"pool_recycle": 300, # 减少连接回收时间到5分钟,防止连接超时和泄漏
}
}
TORTOISE_ORM_CONFIG = {
"connections": connections,
"apps": {
"io_api_models": {
"models": ["apps.io_api.models",],
"default_connection": MYAPS_MAIN_DB # 使用MyAPS账套
},
"monitor_models": {
"models": ["apps.common.monitor.models", "aerich.models"],
"default_connection": "local_data" # 使用local_data数据库
},
},
}
if THIS_DB_NAME:
# 创建PostgreSQL连接配置
connections[THIS_DB_NAME] = {
"engine": "tortoise.backends.asyncpg",
"credentials": {
"host": THIS_DB_HOST,
"port": THIS_DB_PORT,
"user": THIS_DB_USER,
"password": THIS_DB_PASSWORD,
"database": THIS_DB_NAME,
"min_size": 3, # 保持最小连接数
"max_size": 10, # 最大连接数
}
}
TORTOISE_ORM_CONFIG["apps"]["data_opt_models"] = {
"models": ["apps.data_opt.models", "aerich.models"],
"default_connection": THIS_DB_NAME,
}
class ConnectionLeakDetector:
"""连接泄漏检测器"""
-74
View File
@@ -109,77 +109,3 @@ THIS_DB_PORT = int(os.getenv("THIS_DB_PORT") or json_env_config.get("THIS_DB_POR
THIS_DB_USER = os.getenv("THIS_DB_USER") or json_env_config.get("THIS_DB_USER")
THIS_DB_PASSWORD = os.getenv("THIS_DB_PASSWORD") or json_env_config.get("THIS_DB_PASSWORD")
THIS_DB_NAME = os.getenv("THIS_DB_NAME") or json_env_config.get("THIS_DB_NAME")
######################################################################################
# 数据库配置
connections = {}
# 为每个账套创建MySQL连接配置
# 计算连接池大小:根据账套数量动态调整,避免连接总数过多
# 总连接数 = 账套数 × maxsize,应控制在合理范围内(建议不超过150)
import os
cpu_count = os.cpu_count() or 4
db_count = len(MYAPS_DBSET_LIST)
# 动态计算连接池大小:
# - 单账套:maxsize=15
# - 多账套:根据账套数量递减,最小为3
# - 确保总连接数不超过 30(考虑到服务器限制非常严格)
maxsize_per_db = min(15, max(3, 30 // max(db_count, 1)))
minsize_per_db = min(2, maxsize_per_db // 2)
logger.info(f"数据库连接池配置:{db_count}个账套,每个账套minsize={minsize_per_db}, maxsize={maxsize_per_db}")
for db in MYAPS_DBSET_LIST:
connections[db] = {
"engine": "tortoise.backends.mysql",
"credentials": {
"host": MYAPS_DB_HOST,
"port": MYAPS_DB_PORT,
"user": MYAPS_DB_USER,
"password": MYAPS_DB_PASSWORD,
"database": db,
"charset": "utf8mb4",
"connect_timeout": 30, # 减少连接超时时间到30秒
"minsize": minsize_per_db, # 根据账套数量动态调整最小连接数
"maxsize": maxsize_per_db, # 根据账套数量动态调整最大连接数
"ssl": None,
"echo": False,
"pool_recycle": 300, # 减少连接回收时间到5分钟,防止连接超时和泄漏
}
}
TORTOISE_ORM_CONFIG = {
"connections": connections,
"apps": {
"io_api_models": {
"models": ["apps.io_api.models",],
"default_connection": MYAPS_MAIN_DB # 使用MyAPS账套
},
},
}
if THIS_DB_NAME:
# 创建PostgreSQL连接配置
connections[THIS_DB_NAME] = {
"engine": "tortoise.backends.asyncpg",
"credentials": {
"host": THIS_DB_HOST,
"port": THIS_DB_PORT,
"user": THIS_DB_USER,
"password": THIS_DB_PASSWORD,
"database": THIS_DB_NAME,
"min_size": 3, # 保持最小连接数
"max_size": 10, # 最大连接数
}
}
TORTOISE_ORM_CONFIG["apps"]["data_opt_models"] = {
"models": ["apps.data_opt.models", "aerich.models"],
"default_connection": THIS_DB_NAME,
}
@@ -0,0 +1,98 @@
from tortoise import BaseDBAsyncClient
RUN_IN_TRANSACTION = True
async def upgrade(db: BaseDBAsyncClient) -> str:
return """
CREATE TABLE IF NOT EXISTS "api_requests" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
"timestamp" TIMESTAMP NOT NULL /* 请求时间 */,
"method" VARCHAR(10) NOT NULL /* HTTP 方法 */,
"path" VARCHAR(512) NOT NULL /* 请求路径 */,
"query_params" TEXT /* 查询参数 */,
"status_code" INT NOT NULL /* 响应状态码 */,
"response_time" REAL NOT NULL /* 响应时间毫秒 */,
"client_ip" VARCHAR(64) /* 客户端 IP */,
"user_agent" TEXT /* 用户代理 */,
"payload_size" INT /* 请求体大小 */,
"response_size" INT /* 响应体大小 */,
"request_body" TEXT /* 请求体 */,
"response_body" TEXT /* 响应体 */,
"is_slow" INT NOT NULL DEFAULT 0 /* 是否慢请求 */,
"slow_threshold" REAL /* 慢请求阈值毫秒 */,
"is_error" INT NOT NULL DEFAULT 0 /* 是否错误请求 */,
"error_message" TEXT /* 错误信息 */
) /* API 请求记录模型 */;
CREATE INDEX IF NOT EXISTS "idx_api_request_timesta_26587d" ON "api_requests" ("timestamp");
CREATE INDEX IF NOT EXISTS "idx_api_request_path_6217b7" ON "api_requests" ("path");
CREATE INDEX IF NOT EXISTS "idx_api_request_status__d9e8c8" ON "api_requests" ("status_code");
CREATE INDEX IF NOT EXISTS "idx_api_request_respons_c31164" ON "api_requests" ("response_time");
CREATE INDEX IF NOT EXISTS "idx_api_request_is_slow_f53a5b" ON "api_requests" ("is_slow");
CREATE INDEX IF NOT EXISTS "idx_api_request_is_erro_ad6213" ON "api_requests" ("is_error");
CREATE TABLE IF NOT EXISTS "outbound_api_requests" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
"timestamp" TIMESTAMP NOT NULL /* 请求时间 */,
"method" VARCHAR(10) NOT NULL /* HTTP 方法 */,
"url" TEXT NOT NULL /* 请求 URL */,
"status_code" INT NOT NULL /* 响应状态码 */,
"duration" REAL NOT NULL /* 响应时间 */,
"request_headers" TEXT /* 请求头 */,
"request_body" TEXT /* 请求体 */,
"response_headers" TEXT /* 响应头 */,
"response_body" TEXT /* 响应体 */,
"error_message" TEXT /* 错误信息 */,
"module" VARCHAR(255) /* 发起请求的模块 */,
"is_error" INT NOT NULL DEFAULT 0 /* 是否错误请求 */,
"is_slow" INT NOT NULL DEFAULT 0 /* 是否慢请求 */
) /* 对外 HTTP 请求记录模型 */;
CREATE INDEX IF NOT EXISTS "idx_outbound_ap_timesta_61cbc2" ON "outbound_api_requests" ("timestamp");
CREATE INDEX IF NOT EXISTS "idx_outbound_ap_module_99fdb1" ON "outbound_api_requests" ("module");
CREATE INDEX IF NOT EXISTS "idx_outbound_ap_status__edc615" ON "outbound_api_requests" ("status_code");
CREATE INDEX IF NOT EXISTS "idx_outbound_ap_is_erro_503daa" ON "outbound_api_requests" ("is_error");
CREATE INDEX IF NOT EXISTS "idx_outbound_ap_is_slow_6abd9c" ON "outbound_api_requests" ("is_slow");
CREATE TABLE IF NOT EXISTS "aerich" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
"version" VARCHAR(255) NOT NULL,
"app" VARCHAR(100) NOT NULL,
"content" JSON NOT NULL
);"""
async def downgrade(db: BaseDBAsyncClient) -> str:
return """
"""
MODELS_STATE = (
"eJztm21v4jgQgP9KlE9bqVdByAusTifRLnvl1ELV0rvVtlXkJE6JGmI2cbbtrfrfb2wIeS"
"M0oaXJSXxBYTyTxM/YY3sGfokz4jmU+PqMWNgNjvoXw0v8I8QBFT8Lv0QPzTBcbNA6FEQ0"
"n+d1WANFhsvN0dzR/YUBb0BGQH1ksmfYyA0wiCwcmL4zpw7xmAU8QbgNu4at3YaqKUvs2m"
"jdhoqtKCBBUhuuta7B7mYRE27nePdVDUPPgXfSKbnHdIp9ML+5A7HjWfgJB+zrjUidGbw1"
"ms1FaLoR54hOF1cgpGGgm9DbhcDHwZx4AdwPTBYiJ9ADlzyuvmDfJ754xx4yf9BtB7tWCr"
"Njsdficp0+z7ls6NGvXJH11IDnueHMi5Xnz3RKvJW243GX3GMP+4hidnvqhwywF7ru0h8R"
"80X/Y5VFxxM2FrZR6DI3MeuclyJhgv9SZAII8DC8TcA7eM+e8pvUljW521HlLqjwN1lJtJ"
"dF9+K+Lww5gdFEfOHtiKKFBndOzC12Ug7fF6DAmtczTBlmUFpLy6PoIgs2wriJbCSI0cYD"
"PmIr/m6HnsmYCiPi4aOQmh55/EPMz4r0wFYVW70Ne4otZ2fBei9sQDwZng+uJv3zC3anWR"
"D8cDm7/mTAWiQufc5IP6kHTE5gIi9m+eomwj/DyanAvgrfx6MBZ0sCeu/zJ8Z6k+8ieycU"
"UqJDj3VkJQFF4kgEqomABDOWrJktJ1Pkr3d1bJHxMyDalWfXzhrxdDK5EJj7jB5zpaWUc5"
"84Q0+6i717iECfhXZrgzv/7l+enPYvP7VbGReNli0Sb3pJIeWhrQLQSL9unJloD+os2ndL"
"Too0VaUtlcAKWoVceVsaLHTQf9bnyEezIA94gp8KAnzWbivQy0j+LpxVDSuMNgbOSsfkIU"
"hrvT34DL5NUnEnovnpvP/tIBV7zsajPyP1BP2Ts/FxBnpyeS6/pmasXl9cdz20Fdm04BP3"
"5NtQkwyI92qrBVsYrdtqlxzg77L2xmDT25wc2q8uQQVwc5YZvDYzrRNwvKDehrbd6oLEwA"
"bA7lkSl/TePNa/jK+PzwbCxeXgZHg1HI/SKytvZCIQOJRDuRz0zzIeMF0He1R31ux2iqN1"
"yqj2SKIYiEUPqQNxW0PYFoYX24RrVS4RrVW5MFizpjTbMMC+joAErRKp01a109UUqRvRlT"
"HugERuqc2M03P0DNPe0gPn3yqBOmu2VaR+T+bJPYhsK8Bc6UkgUcyWXXOgrog2Z1c722SM"
"bgpbnszQDWI9VwkUWbvaQ0V22DYzSKyGZHXeGcPagWfHcjOBR4mrHOpjQlyMvIKMVGyV4W"
"yA2a52dgUZRBjVqsSOgrLEds1tVUqO9jdjPx6Pz1LYj4dZrtfnxwM4gx+kN3T5YMKY6XQK"
"Y3VK3DU5jQ1b6rzplnvqdz0mZljDllplm2ml1TEbvbFeZWgrD/uVWdPGfU9p9/iR3W7q6O"
"fo9BkOAthAV4ntOcPaY3sStmzjNjusqyV3KB8R4VndwX5IZNCZwEDmwyPyLT3XQiRSpJtv"
"mkmzrAR54Bhr2WPWv0wVaRxSg4SeVa7mtEb7sEztiSzt9OpFKHZOtXtst9lWhWXWdouS1P"
"a32aJABQRCFxeUqOISVKo6ta9HbXUc2Nej9vWofT1qZ/Wo0Hcr5eAW6nXDTE4J4fryrDmr"
"/7468gGJISuEvrP3y1HdcIpLGjW/JtKwQ1uUUptiZGG/UoF1jWntZ4jkgqr0Ou+wlO4mIb"
"fPf9aR/9xqkOdta8eejDBNHuX7tPOHAt/ngmrBvswZ5HhvOLysLGoHrXSsNvvZl6alo7mm"
"duU4n6Jo2xxqJEUpcaoBrcJjDW/bZ5lrzzLvC1o7ht2w3HIf+445FTf9h2GhcVjq/wux7m"
"tJ4+Ic5OvJ3H0e9vB987A/Yb+79jBevLIlTGpOJZWnuPs1i02RChCX6v9PgO1WuVTmplxm"
"LpkJT6Rrf1T419V4VPCLzdgkA/Lagw7eWI5JDwXXCehdM7FuoMh6vXkbm92xHqZz9uwGtZ"
"c0X/4Dnu2PMA=="
)
+1 -1
View File
@@ -1,4 +1,4 @@
[tool.aerich]
tortoise_orm = "core.settings.TORTOISE_ORM_CONFIG"
tortoise_orm = "core.database.TORTOISE_ORM_CONFIG"
location = "./migrations"
src_folder = "./."
+18
View File
@@ -0,0 +1,18 @@
@echo off
REM 设置项目目录
set PROJECT_DIR=JYHDXS
REM 显示当前设置
echo Project Directory: %PROJECT_DIR%
echo Starting database migration...
REM 使用完整路径运行 Python
"%~dp0\..\..\venv\Scripts\python.exe" -m aerich init -t core.database.TORTOISE_ORM
"%~dp0\..\..\venv\Scripts\python.exe" -m aerich migrate --name monitor_models
"%~dp0\..\..\venv\Scripts\python.exe" -m aerich upgrade
echo Database migration completed!
pause
+12 -2
View File
@@ -286,8 +286,13 @@
<main class="main-content page-content" id="page-api-requests" style="display: none;">
<section class="card full-width">
<div class="card-header">
<h2>接收请求记录 <span style="font-size: 12px; color: #888; font-weight: normal;">(最近24小时)</span></h2>
<h2>接收请求记录</h2>
<div class="card-actions">
<div class="date-selector" style="display: none;">
<label for="api-date-picker">选择日期:</label>
<input type="date" id="api-date-picker" value="">
<button class="btn btn-sm" onclick="fetchRequestsByDate()">查询</button>
</div>
<div class="toggle-switch">
<label>
<input type="checkbox" id="show-api-internal-requests" onchange="toggleAPIIntternalRequests()">
@@ -383,8 +388,13 @@
<main class="main-content page-content" id="page-outbound-requests" style="display: none;">
<section class="card full-width">
<div class="card-header">
<h2>发送请求记录 <span style="font-size: 12px; color: #888; font-weight: normal;">(最近24小时)</span></h2>
<h2>发送请求记录</h2>
<div class="card-actions">
<div class="date-selector" style="display: none;">
<label for="outbound-date-picker">选择日期:</label>
<input type="date" id="outbound-date-picker" value="">
<button class="btn btn-sm" onclick="fetchOutboundRequestsByDate()">查询</button>
</div>
<div class="toggle-switch">
<label>
<input type="checkbox" id="show-internal-requests" onchange="toggleInternalRequests()">
+271 -73
View File
@@ -47,6 +47,23 @@ const domCache = {};
// 时间格式化缓存
const timeCache = {};
// 页面加载时设置默认日期
window.addEventListener('DOMContentLoaded', function() {
const today = new Date().toISOString().split('T')[0];
// 设置接收请求日期选择器的默认值
const datePicker = document.getElementById('api-date-picker');
if (datePicker) {
datePicker.value = today;
}
// 设置发送请求日期选择器的默认值
const outboundDatePicker = document.getElementById('outbound-date-picker');
if (outboundDatePicker) {
outboundDatePicker.value = today;
}
});
// 获取DOM元素,优先从缓存中获取
function getElement(id) {
if (!domCache[id]) {
@@ -188,6 +205,13 @@ document.addEventListener('DOMContentLoaded', async () => {
initWebSocket();
startInactivityTimer();
// 设置默认日期为今天
const today = new Date().toISOString().split('T')[0];
const datePicker = document.getElementById('api-date-picker');
if (datePicker) {
datePicker.value = today;
}
// 监听用户活动
window.addEventListener('mousemove', resetInactivityTimer);
window.addEventListener('keydown', resetInactivityTimer);
@@ -431,54 +455,61 @@ function handleWebSocketData(data) {
return;
}
// 更新资源指标
if (data.resource) {
updateResourceDisplay(data.resource);
}
// 检查系统状态是否为错误
const statusIndicator = getElement('status-indicator');
const isSystemError = statusIndicator && statusIndicator.classList.contains('error');
// 更新数据库指标
if (data.database) {
updateDatabaseDisplay(data.database);
// 如果系统状态正常,才处理数据并更新最后更新时间
if (!isSystemError) {
// 更新资源指标
if (data.resource) {
updateResourceDisplay(data.resource);
}
// 更新数据库指标
if (data.database) {
updateDatabaseDisplay(data.database);
}
// 更新调度器指标
if (data.scheduler) {
updateSchedulerDisplay(data.scheduler);
}
// 更新 HTTP 指标
if (data.http) {
updateHTTPDisplay(data.http);
}
// 更新对外 HTTP 指标
if (data.outbound_http) {
// 这里可以添加对外 HTTP 指标的更新逻辑
}
// 更新告警
if (data.alerts) {
updateAlertsDisplay(data.alerts);
}
// 更新日志数据
if (data.logs) {
updateLogsPageDisplay(data.logs);
}
// 更新 API 请求数据
if (data.api_requests) {
updateAPIRequestsDisplay(data.api_requests);
}
// 更新发送请求数据
if (data.outbound_requests) {
updateOutboundRequestsDisplay(data.outbound_requests);
}
// 更新最后更新时间
updateLastUpdateTime();
}
// 更新调度器指标
if (data.scheduler) {
updateSchedulerDisplay(data.scheduler);
}
// 更新 HTTP 指标
if (data.http) {
updateHTTPDisplay(data.http);
}
// 更新对外 HTTP 指标
if (data.outbound_http) {
// 这里可以添加对外 HTTP 指标的更新逻辑
}
// 更新告警
if (data.alerts) {
updateAlertsDisplay(data.alerts);
}
// 更新日志数据
if (data.logs) {
updateLogsPageDisplay(data.logs);
}
// 更新 API 请求数据
if (data.api_requests) {
updateAPIRequestsDisplay(data.api_requests);
}
// 更新发送请求数据
if (data.outbound_requests) {
updateOutboundRequestsDisplay(data.outbound_requests);
}
// 更新最后更新时间
updateLastUpdateTime();
// 检查告警条件
checkAlertConditions();
}
@@ -618,35 +649,45 @@ async function refreshAll() {
}
try {
// 基础数据,无论哪个页面都需要刷新
await Promise.all([
fetchHealth(),
fetchResource(),
fetchDatabase(),
fetchScheduler(),
fetchHTTP(),
fetchAlerts(),
fetchOverviewOutboundRequests(),
// 无论在哪个页面都刷新日志列表、API 请求记录和发送请求记录
fetchLogsPage(),
fetchAPIRequests(),
fetchOutboundRequests()
]);
// 先检查健康状态
await fetchHealth();
// 只刷新当前页面的特定数据
switch (currentPage) {
case 'database':
await Promise.all([
fetchDatabaseDetail(),
fetchEventStats()
]);
break;
case 'scheduler':
await fetchSchedulerPage();
break;
// 检查系统状态是否为错误
const statusIndicator = getElement('status-indicator');
const isSystemError = statusIndicator && statusIndicator.classList.contains('error');
// 如果系统状态正常,才继续获取其他数据
if (!isSystemError) {
// 基础数据,无论哪个页面都需要刷新
await Promise.all([
fetchResource(),
fetchDatabase(),
fetchScheduler(),
fetchHTTP(),
fetchAlerts(),
fetchOverviewOutboundRequests(),
// 无论在哪个页面都刷新日志列表、API 请求记录和发送请求记录
fetchLogsPage(),
fetchAPIRequests(),
fetchOutboundRequests()
]);
// 只刷新当前页面的特定数据
switch (currentPage) {
case 'database':
await Promise.all([
fetchDatabaseDetail(),
fetchEventStats()
]);
break;
case 'scheduler':
await fetchSchedulerPage();
break;
}
updateLastUpdateTime();
}
updateLastUpdateTime();
checkAlertConditions();
} catch (error) {
console.error('刷新数据失败:', error);
@@ -1327,8 +1368,20 @@ function formatDateTime(timestamp, format = 'relative') {
}
const now = new Date();
const date = new Date(timestamp * 1000);
const diff = now.getTime() / 1000 - timestamp;
let date;
let timestampSec;
if (typeof timestamp === 'string') {
// 处理ISO字符串格式
date = new Date(timestamp);
timestampSec = date.getTime() / 1000;
} else {
// 处理数字时间戳(秒)
date = new Date(timestamp * 1000);
timestampSec = timestamp;
}
const diff = now.getTime() / 1000 - timestampSec;
let result;
@@ -2222,6 +2275,151 @@ async function resetAPIStats() {
}
}
// 按日期获取请求记录
async function fetchRequestsByDate() {
// 如果用户不活动,不发送请求
if (isInactive) {
console.log('用户不活动,跳过按日期获取请求记录');
return;
}
const datePicker = document.getElementById('api-date-picker');
const date = datePicker.value;
if (!date) {
alert('请选择日期');
return;
}
try {
const response = await fetch(`${API_BASE}/http/requests?date=${date}`);
const data = await response.json();
updateAPIRequestsTable(data.requests);
// 更新页面标题,显示当前查询的日期
const pageTitle = document.querySelector('#page-api-requests h2');
if (pageTitle) {
pageTitle.textContent = `接收请求记录 (${date})`;
}
} catch (error) {
console.error('按日期获取请求记录失败:', error);
alert('获取请求记录失败,请稍后重试');
}
}
function updateAPIRequestsTable(requests) {
const tableBody = document.getElementById('api-requests-tbody');
if (!tableBody) return;
// 限制存储最近100条数据
const limitedRequests = requests.slice(0, 100);
if (limitedRequests.length === 0) {
tableBody.innerHTML = '<tr><td colspan="9" class="empty-state">暂无 API 请求记录</td></tr>';
return;
}
// 使用文档片段减少DOM操作次数
const fragment = document.createDocumentFragment();
limitedRequests.forEach((req, index) => {
const timeStr = formatDateTime(req.timestamp, 'datetime');
const methodClass = req.method.toLowerCase();
const isSuccess = req.status_code < 400;
const statusClass = isSuccess ? 'success' : 'error';
const errorMsg = req.error_message || '';
// 转义特殊字符,防止HTML属性值被截断
const escapedPath = req.path.replace(/"/g, '&quot;');
const escapedErrorMsg = errorMsg.replace(/"/g, '&quot;');
// 根据响应时间设置样式
const durationMs = req.response_time;
let durationClass = '';
if (durationMs > 1000) {
durationClass = 'duration-slow';
} else if (durationMs > 500) {
durationClass = 'duration-medium';
}
// 处理查询参数显示
let queryParamsDisplay = '';
if (req.query_params) {
try {
const parsedParams = JSON.parse(req.query_params);
if (Object.keys(parsedParams).length > 0) {
queryParamsDisplay = Object.entries(parsedParams)
.map(([key, value]) => `${key}=${value}`)
.join('&');
}
} catch (e) {
queryParamsDisplay = req.query_params;
}
}
const statusDescription = getStatusDescription(req.status_code);
const statusText = statusDescription ? `${req.status_code} ${statusDescription}` : `${req.status_code}`;
const tr = document.createElement('tr');
tr.onclick = () => {
// 存储当前请求数据,以便点击时使用
window.apiRequestsData = {
recent_requests: limitedRequests
};
showRequestDetail(index);
};
tr.dataset.requestIndex = index;
tr.innerHTML = `
<td class="font-mono">${index + 1}</td>
<td>${timeStr}</td>
<td><span class="api-method ${methodClass}">${req.method}</span></td>
<td class="font-mono" style="font-size: 12px;">${req.path}</td>
<td class="font-mono" style="font-size: 12px; max-width: 200px; overflow: hidden; text-overflow: ellipsis; white-space: nowrap;" title="${queryParamsDisplay}">${queryParamsDisplay || '-'}</td>
<td><span class="api-status ${statusClass}">${statusText}</span></td>
<td class="${durationClass}">${durationMs.toFixed(0)}ms</td>
<td>${req.client_ip}</td>
<td class="error-message-cell" title="${escapedErrorMsg}">${errorMsg}</td>
`;
fragment.appendChild(tr);
});
tableBody.innerHTML = '';
tableBody.appendChild(fragment);
}
// 按日期获取对外请求记录
async function fetchOutboundRequestsByDate() {
// 如果用户不活动,不发送请求
if (isInactive) {
console.log('用户不活动,跳过按日期获取对外请求记录');
return;
}
const datePicker = document.getElementById('outbound-date-picker');
const date = datePicker.value;
if (!date) {
alert('请选择日期');
return;
}
try {
const response = await fetch(`${API_BASE}/outbound-http/requests?date=${date}`);
const data = await response.json();
updateOutboundRequestsTable(data.requests);
// 更新页面标题,显示当前查询的日期
const pageTitle = document.querySelector('#page-outbound-requests h2');
if (pageTitle) {
pageTitle.textContent = `发送请求记录 (${date})`;
}
} catch (error) {
console.error('按日期获取对外请求记录失败:', error);
alert('获取对外请求记录失败,请稍后重试');
}
}
// 日志页面
async function fetchLogsPage() {
// 如果用户不活动,不发送请求