mirror of
https://github.com/rnvm9wjdtj-bot/myaps_api.git
synced 2026-06-02 05:54:40 +00:00
feat: 日志历史查询页面重构优化
- 页面布局重构:header集成查询控件,分页移至tabs右侧 - 样式美化:全局等宽字体、Bootstrap图标、组件阴影动画 - 级别过滤改为≥逻辑,关键词支持多字段(路径/URL/请求体/响应体) - 时间范围可选(支持查询全部数据) - 统计数据改为后端计算(总数+级别分布) - 高级筛选分解到各页签,边界隔离修复 - 表头冻结、列表自适应高度 - 新增日志查询索引迁移脚本
This commit is contained in:
@@ -136,6 +136,9 @@ class APIRequest(Model):
|
||||
("is_error",),
|
||||
("is_internal",),
|
||||
("request_id",),
|
||||
("client_ip",),
|
||||
("method",),
|
||||
("timestamp", "status_code"),
|
||||
]
|
||||
|
||||
|
||||
@@ -167,6 +170,8 @@ class OutboundAPIRequest(Model):
|
||||
("is_error",),
|
||||
("is_slow",),
|
||||
("is_internal",),
|
||||
("method",),
|
||||
("timestamp", "status_code"),
|
||||
]
|
||||
|
||||
|
||||
@@ -193,6 +198,7 @@ class SystemLog(Model):
|
||||
("level",),
|
||||
("module",),
|
||||
("function",),
|
||||
("timestamp", "level"),
|
||||
]
|
||||
|
||||
|
||||
|
||||
+584
-15
@@ -15,6 +15,7 @@ from tortoise import Tortoise
|
||||
from .service import monitor_service
|
||||
from .log_stream_service import log_stream_service
|
||||
from .storage import request_storage, outbound_request_storage, system_log_storage
|
||||
from .models import APIRequest, OutboundAPIRequest, SystemLog
|
||||
from core.settings import TIMEZONE
|
||||
from globalobjects import logger as log_config
|
||||
|
||||
@@ -895,25 +896,49 @@ async def get_logs_by_time_range(
|
||||
|
||||
@router.get("/history/query")
|
||||
async def get_history_by_time_range(
|
||||
start_time: str,
|
||||
end_time: str,
|
||||
start_time: str = None,
|
||||
end_time: str = None,
|
||||
timezone_offset: int = None,
|
||||
level: str = None,
|
||||
type: str = None,
|
||||
limit: int = 1000
|
||||
limit: int = 1000,
|
||||
module: str = None,
|
||||
keyword: str = None,
|
||||
status_code_min: int = None,
|
||||
status_code_max: int = None,
|
||||
duration_min: float = None,
|
||||
duration_max: float = None,
|
||||
client_ip: str = None,
|
||||
method: str = None,
|
||||
page: int = 1,
|
||||
page_size: int = 50,
|
||||
sort_by: str = None,
|
||||
sort_order: str = "desc"
|
||||
):
|
||||
"""
|
||||
统一的历史查询端点(用于日志联动功能)
|
||||
|
||||
按时间范围查询接收请求、发送请求和系统日志,并支持数据类型过滤
|
||||
按时间范围查询接收请求、发送请求和系统日志,并支持数据类型过滤和高级过滤条件
|
||||
|
||||
Args:
|
||||
start_time: 开始时间(本地时间,ISO格式,如 "2024-01-15T18:30:00")
|
||||
end_time: 结束时间(本地时间,ISO格式,如 "2024-01-15T18:35:00")
|
||||
start_time: 开始时间(本地时间,ISO格式,如 "2024-01-15T18:30:00"),不传则查询全部
|
||||
end_time: 结束时间(本地时间,ISO格式,如 "2024-01-15T18:35:00"),不传则查询全部
|
||||
timezone_offset: 时区偏移分钟数(默认从配置读取,UTC+8 = 480)
|
||||
level: 日志级别过滤(DEBUG/INFO/WARNING/ERROR/CRITICAL)
|
||||
type: 数据类型过滤(http/outbound/logs,不传则返回全部)
|
||||
limit: 每种数据类型返回数量限制
|
||||
limit: 每种数据类型返回数量限制(用于无分页场景)
|
||||
module: 模块名过滤(支持多选,逗号分隔,如 "order_service,payment")
|
||||
keyword: 关键词全文搜索(匹配message、path、url等文本字段)
|
||||
status_code_min: 状态码范围下限
|
||||
status_code_max: 状态码范围上限
|
||||
duration_min: 响应时间范围下限(毫秒)
|
||||
duration_max: 响应时间范围上限(毫秒)
|
||||
client_ip: 客户端IP过滤
|
||||
method: HTTP请求方法过滤(GET/POST/PUT/DELETE等)
|
||||
page: 页码(从1开始)
|
||||
page_size: 每页条数(50/100/200/500)
|
||||
sort_by: 排序字段(timestamp/duration/status_code)
|
||||
sort_order: 排序方向(asc/desc)
|
||||
|
||||
Returns:
|
||||
包含接收请求、发送请求和系统日志的查询结果
|
||||
@@ -931,9 +956,58 @@ async def get_history_by_time_range(
|
||||
except (ValueError, TypeError):
|
||||
timezone_offset = 480 # 默认 UTC+8
|
||||
|
||||
# 转换为UTC时间
|
||||
utc_start = local_to_utc(start_time, timezone_offset)
|
||||
utc_end = local_to_utc(end_time, timezone_offset)
|
||||
# 转换为UTC时间(如果提供了时间范围)
|
||||
utc_start = local_to_utc(start_time, timezone_offset) if start_time else None
|
||||
utc_end = local_to_utc(end_time, timezone_offset) if end_time else None
|
||||
|
||||
# 参数验证
|
||||
if status_code_min is not None and status_code_max is not None and status_code_min > status_code_max:
|
||||
raise HTTPException(status_code=400, detail="status_code_min 不能大于 status_code_max")
|
||||
|
||||
if duration_min is not None and duration_max is not None and duration_min > duration_max:
|
||||
raise HTTPException(status_code=400, detail="duration_min 不能大于 duration_max")
|
||||
|
||||
if status_code_min is not None and (status_code_min < 100 or status_code_min > 599):
|
||||
raise HTTPException(status_code=400, detail="status_code_min 必须在 100-599 范围内")
|
||||
|
||||
if status_code_max is not None and (status_code_max < 100 or status_code_max > 599):
|
||||
raise HTTPException(status_code=400, detail="status_code_max 必须在 100-599 范围内")
|
||||
|
||||
if duration_min is not None and duration_min < 0:
|
||||
raise HTTPException(status_code=400, detail="duration_min 不能为负数")
|
||||
|
||||
if duration_max is not None and duration_max < 0:
|
||||
raise HTTPException(status_code=400, detail="duration_max 不能为负数")
|
||||
|
||||
# 分页参数验证
|
||||
if page < 1:
|
||||
raise HTTPException(status_code=400, detail="page 必须 >= 1")
|
||||
|
||||
valid_page_sizes = [50, 100, 200, 500]
|
||||
if page_size not in valid_page_sizes:
|
||||
raise HTTPException(status_code=400, detail=f"page_size 必须是 {valid_page_sizes} 之一")
|
||||
|
||||
if sort_order not in ["asc", "desc"]:
|
||||
raise HTTPException(status_code=400, detail="sort_order 必须是 asc 或 desc")
|
||||
|
||||
if sort_by and sort_by not in ["timestamp", "duration", "status_code"]:
|
||||
raise HTTPException(status_code=400, detail="sort_by 必须是 timestamp/duration/status_code 之一")
|
||||
|
||||
# 构建过滤条件字典(传递给storage层)
|
||||
filter_params = {
|
||||
"module": module.split(',') if module else None,
|
||||
"keyword": keyword,
|
||||
"status_code_min": status_code_min,
|
||||
"status_code_max": status_code_max,
|
||||
"duration_min": duration_min,
|
||||
"duration_max": duration_max,
|
||||
"client_ip": client_ip,
|
||||
"method": method,
|
||||
"page": page,
|
||||
"page_size": page_size,
|
||||
"sort_by": sort_by,
|
||||
"sort_order": sort_order
|
||||
}
|
||||
|
||||
result = {
|
||||
"http_requests": [],
|
||||
@@ -941,8 +1015,9 @@ async def get_history_by_time_range(
|
||||
"logs": [],
|
||||
"time_range": {
|
||||
"original": {"start": start_time, "end": end_time},
|
||||
"converted": {"start": utc_start.isoformat(), "end": utc_end.isoformat()}
|
||||
}
|
||||
"converted": {"start": utc_start.isoformat() if utc_start else None, "end": utc_end.isoformat() if utc_end else None}
|
||||
},
|
||||
"filter_params": {k: v for k, v in filter_params.items() if v is not None}
|
||||
}
|
||||
|
||||
# 根据类型参数决定查询哪些数据
|
||||
@@ -952,7 +1027,9 @@ async def get_history_by_time_range(
|
||||
|
||||
# 查询接收请求
|
||||
if should_query_http:
|
||||
http_requests = await request_storage.get_requests_by_time_range(utc_start, utc_end, limit)
|
||||
http_requests = await request_storage.get_requests_with_filters(
|
||||
utc_start, utc_end, limit, filter_params
|
||||
)
|
||||
result["http_requests"] = [{
|
||||
"id": req.id,
|
||||
"timestamp": req.timestamp.isoformat(),
|
||||
@@ -974,7 +1051,9 @@ async def get_history_by_time_range(
|
||||
|
||||
# 查询发送请求
|
||||
if should_query_outbound:
|
||||
outbound_requests = await outbound_request_storage.get_requests_by_time_range(utc_start, utc_end, limit)
|
||||
outbound_requests = await outbound_request_storage.get_requests_with_filters(
|
||||
utc_start, utc_end, limit, filter_params
|
||||
)
|
||||
result["outbound_requests"] = [{
|
||||
"id": req.id,
|
||||
"timestamp": req.timestamp.isoformat(),
|
||||
@@ -994,7 +1073,9 @@ async def get_history_by_time_range(
|
||||
|
||||
# 查询系统日志
|
||||
if should_query_logs:
|
||||
logs = await system_log_storage.get_logs_by_time_range(utc_start, utc_end, level, limit)
|
||||
logs = await system_log_storage.get_logs_with_filters(
|
||||
utc_start, utc_end, level, limit, filter_params
|
||||
)
|
||||
result["logs"] = [{
|
||||
"id": log.id,
|
||||
"timestamp": log.timestamp.isoformat(),
|
||||
@@ -1006,4 +1087,492 @@ async def get_history_by_time_range(
|
||||
"stack_trace": log.stack_trace
|
||||
} for log in logs]
|
||||
|
||||
# 计算分页元数据
|
||||
total_count = 0
|
||||
if should_query_http:
|
||||
total_count = await request_storage.count_requests_with_filters(utc_start, utc_end, filter_params)
|
||||
elif should_query_outbound:
|
||||
total_count = await outbound_request_storage.count_requests_with_filters(utc_start, utc_end, filter_params)
|
||||
elif should_query_logs:
|
||||
total_count = await system_log_storage.count_logs_with_filters(utc_start, utc_end, level, filter_params)
|
||||
|
||||
if total_count > 0:
|
||||
total_pages = (total_count + page_size - 1) // page_size
|
||||
result["pagination"] = {
|
||||
"page": page,
|
||||
"page_size": page_size,
|
||||
"total_count": total_count,
|
||||
"total_pages": total_pages,
|
||||
"has_next": page < total_pages,
|
||||
"has_prev": page > 1,
|
||||
"start_index": (page - 1) * page_size + 1,
|
||||
"end_index": min(page * page_size, total_count)
|
||||
}
|
||||
|
||||
# 统计总数(用于结果摘要显示)
|
||||
result["stats"] = {
|
||||
"http_count": await request_storage.count_requests_with_filters(utc_start, utc_end, filter_params) if should_query_http else 0,
|
||||
"outbound_count": await outbound_request_storage.count_requests_with_filters(utc_start, utc_end, filter_params) if should_query_outbound else 0,
|
||||
"logs_count": await system_log_storage.count_logs_with_filters(utc_start, utc_end, level, filter_params) if should_query_logs else 0
|
||||
}
|
||||
|
||||
# 统计日志级别分布(仅查询系统日志时)
|
||||
if should_query_logs and result["stats"]["logs_count"] > 0:
|
||||
level_stats = {}
|
||||
for lv in ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']:
|
||||
count = await system_log_storage.count_logs_with_filters(utc_start, utc_end, lv, filter_params)
|
||||
if count > 0:
|
||||
level_stats[lv] = count
|
||||
result["stats"]["level_distribution"] = level_stats
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# ========== 统计数据端点 ==========
|
||||
|
||||
@router.get("/history/stats")
|
||||
async def get_history_stats(
|
||||
start_time: str,
|
||||
end_time: str,
|
||||
timezone_offset: int = None
|
||||
):
|
||||
"""
|
||||
统计数据端点 - 用于图表分析
|
||||
|
||||
Returns:
|
||||
- 时序趋势数据
|
||||
- 日志级别分布
|
||||
- 状态码分布
|
||||
- 慢请求TOP10
|
||||
"""
|
||||
if timezone_offset is None or timezone_offset == 0:
|
||||
try:
|
||||
tz_str = str(TIMEZONE).strip()
|
||||
tz_hours = float(tz_str) if tz_str.startswith(('+', '-')) else float(tz_str)
|
||||
timezone_offset = int(tz_hours * 60)
|
||||
except (ValueError, TypeError):
|
||||
timezone_offset = 480
|
||||
|
||||
utc_start = local_to_utc(start_time, timezone_offset)
|
||||
utc_end = local_to_utc(end_time, timezone_offset)
|
||||
|
||||
# 统计日志级别分布
|
||||
logs = await system_log_storage.get_logs_by_time_range(utc_start, utc_end, None, 10000)
|
||||
level_dist = {}
|
||||
for log in logs:
|
||||
level = log.level or "INFO"
|
||||
level_dist[level] = level_dist.get(level, 0) + 1
|
||||
|
||||
# 统计HTTP状态码分布
|
||||
http_requests = await request_storage.get_requests_by_time_range(utc_start, utc_end, 10000)
|
||||
status_dist = {}
|
||||
slow_requests = []
|
||||
for req in http_requests:
|
||||
code_range = f"{req.status_code // 100}xx"
|
||||
status_dist[code_range] = status_dist.get(code_range, 0) + 1
|
||||
if req.is_slow:
|
||||
slow_requests.append({
|
||||
"path": req.path,
|
||||
"method": req.method,
|
||||
"duration": req.response_time
|
||||
})
|
||||
|
||||
slow_requests.sort(key=lambda x: x["duration"], reverse=True)
|
||||
slow_top10 = slow_requests[:10]
|
||||
|
||||
from collections import defaultdict
|
||||
hourly_counts = defaultdict(lambda: {"requests": 0, "errors": 0})
|
||||
|
||||
for req in http_requests:
|
||||
hour = req.timestamp.strftime("%Y-%m-%d %H:00")
|
||||
hourly_counts[hour]["requests"] += 1
|
||||
if req.is_error:
|
||||
hourly_counts[hour]["errors"] += 1
|
||||
|
||||
trend_data = [
|
||||
{"time": k, "requests": v["requests"], "errors": v["errors"]}
|
||||
for k, v in sorted(hourly_counts.items())
|
||||
]
|
||||
|
||||
return {
|
||||
"trend": trend_data,
|
||||
"level_distribution": level_dist,
|
||||
"status_distribution": status_dist,
|
||||
"slow_requests": slow_top10,
|
||||
"summary": {
|
||||
"total_logs": len(logs),
|
||||
"total_requests": len(http_requests),
|
||||
"error_count": sum(1 for r in http_requests if r.is_error),
|
||||
"slow_count": len(slow_requests)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
# ========== 时间线端点 ==========
|
||||
|
||||
@router.get("/history/timeline")
|
||||
async def get_history_timeline(
|
||||
start_time: str,
|
||||
end_time: str,
|
||||
timezone_offset: int = None,
|
||||
level: str = None,
|
||||
limit: int = 500,
|
||||
module: str = None,
|
||||
keyword: str = None,
|
||||
status_code_min: int = None,
|
||||
status_code_max: int = None,
|
||||
duration_min: float = None,
|
||||
duration_max: float = None,
|
||||
client_ip: str = None,
|
||||
method: str = None
|
||||
):
|
||||
"""
|
||||
时间线查询端点 - 合并三源数据并按时间排序
|
||||
|
||||
Args:
|
||||
start_time: 开始时间(本地时间)
|
||||
end_time: 结束时间(本地时间)
|
||||
timezone_offset: 时区偏移分钟数
|
||||
level: 日志级别过滤
|
||||
limit: 返回数量限制
|
||||
其他过滤参数同history/query
|
||||
|
||||
Returns:
|
||||
按时间排序的混合事件列表
|
||||
"""
|
||||
# 时区处理
|
||||
if timezone_offset is None or timezone_offset == 0:
|
||||
try:
|
||||
tz_str = str(TIMEZONE).strip()
|
||||
if tz_str.startswith(('+', '-')):
|
||||
tz_hours = float(tz_str)
|
||||
else:
|
||||
tz_hours = float(tz_str)
|
||||
timezone_offset = int(tz_hours * 60)
|
||||
except (ValueError, TypeError):
|
||||
timezone_offset = 480
|
||||
|
||||
utc_start = local_to_utc(start_time, timezone_offset)
|
||||
utc_end = local_to_utc(end_time, timezone_offset)
|
||||
|
||||
filter_params = {
|
||||
"module": module.split(',') if module else None,
|
||||
"keyword": keyword,
|
||||
"status_code_min": status_code_min,
|
||||
"status_code_max": status_code_max,
|
||||
"duration_min": duration_min,
|
||||
"duration_max": duration_max,
|
||||
"client_ip": client_ip,
|
||||
"method": method
|
||||
}
|
||||
|
||||
# 并发查询三源数据
|
||||
http_task = request_storage.get_requests_with_filters(utc_start, utc_end, limit, filter_params)
|
||||
outbound_task = outbound_request_storage.get_requests_with_filters(utc_start, utc_end, limit, filter_params)
|
||||
logs_task = system_log_storage.get_logs_with_filters(utc_start, utc_end, level, limit, filter_params)
|
||||
|
||||
http_requests, outbound_requests, logs = await asyncio.gather(
|
||||
http_task, outbound_task, logs_task
|
||||
)
|
||||
|
||||
# 转换为统一事件格式
|
||||
events = []
|
||||
|
||||
# HTTP请求事件
|
||||
for req in http_requests:
|
||||
events.append({
|
||||
"id": f"http_{req.id}",
|
||||
"timestamp": req.timestamp.isoformat(),
|
||||
"type": "http",
|
||||
"icon": "📥",
|
||||
"level": "INFO" if req.status_code < 400 else "ERROR",
|
||||
"summary": f"{req.method} {req.path}",
|
||||
"detail": {
|
||||
"method": req.method,
|
||||
"path": req.path,
|
||||
"status_code": req.status_code,
|
||||
"duration": req.response_time,
|
||||
"client_ip": req.client_ip,
|
||||
"is_error": req.is_error,
|
||||
"is_slow": req.is_slow
|
||||
}
|
||||
})
|
||||
|
||||
# 发送请求事件
|
||||
for req in outbound_requests:
|
||||
events.append({
|
||||
"id": f"outbound_{req.id}",
|
||||
"timestamp": req.timestamp.isoformat(),
|
||||
"type": "outbound",
|
||||
"icon": "📤",
|
||||
"level": "INFO" if req.status_code < 400 else "ERROR",
|
||||
"summary": f"{req.method} {req.url[:80]}",
|
||||
"detail": {
|
||||
"method": req.method,
|
||||
"url": req.url,
|
||||
"status_code": req.status_code,
|
||||
"duration": req.duration * 1000,
|
||||
"module": req.module,
|
||||
"is_error": req.is_error,
|
||||
"is_slow": req.is_slow
|
||||
}
|
||||
})
|
||||
|
||||
# 系统日志事件
|
||||
for log in logs:
|
||||
icon_map = {
|
||||
"DEBUG": "🔍",
|
||||
"INFO": "📝",
|
||||
"WARNING": "⚠️",
|
||||
"ERROR": "❌",
|
||||
"CRITICAL": "🔥"
|
||||
}
|
||||
events.append({
|
||||
"id": f"log_{log.id}",
|
||||
"timestamp": log.timestamp.isoformat(),
|
||||
"type": "log",
|
||||
"icon": icon_map.get(log.level, "📝"),
|
||||
"level": log.level,
|
||||
"summary": log.message[:100] if log.message else "",
|
||||
"detail": {
|
||||
"level": log.level,
|
||||
"module": log.module,
|
||||
"function": log.function,
|
||||
"line_number": log.line_number,
|
||||
"message": log.message,
|
||||
"stack_trace": log.stack_trace
|
||||
}
|
||||
})
|
||||
|
||||
# 按时间戳排序(降序)
|
||||
events.sort(key=lambda x: x["timestamp"], reverse=True)
|
||||
|
||||
# 应用限制
|
||||
if len(events) > limit:
|
||||
events = events[:limit]
|
||||
|
||||
return {
|
||||
"events": events,
|
||||
"count": len(events),
|
||||
"time_range": {
|
||||
"original": {"start": start_time, "end": end_time},
|
||||
"converted": {"start": utc_start.isoformat(), "end": utc_end.isoformat()}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
# ========== 增量拉取端点 ==========
|
||||
|
||||
@router.get("/history/recent")
|
||||
async def get_recent_data(
|
||||
since_timestamp: str = None,
|
||||
data_type: str = "logs",
|
||||
limit: int = 100
|
||||
):
|
||||
"""
|
||||
增量拉取端点 - 用于实时追踪
|
||||
|
||||
Args:
|
||||
since_timestamp: 上次拉取的时间戳(ISO格式),拉取此时间之后的数据
|
||||
data_type: 数据类型(http/outbound/logs)
|
||||
limit: 返回数量限制
|
||||
|
||||
Returns:
|
||||
新增数据和最新时间戳
|
||||
"""
|
||||
if not since_timestamp:
|
||||
# 无时间戳时返回最近数据
|
||||
since = datetime.now(timezone.utc) - timedelta(minutes=5)
|
||||
else:
|
||||
since = datetime.fromisoformat(since_timestamp.replace('Z', '+00:00'))
|
||||
|
||||
latest_timestamp = since.isoformat()
|
||||
|
||||
result = {"events": [], "latest_timestamp": latest_timestamp}
|
||||
|
||||
if data_type == "logs":
|
||||
logs = await SystemLog.filter(timestamp__gt=since).limit(limit).order_by('-timestamp').all()
|
||||
if logs:
|
||||
latest_timestamp = logs[0].timestamp.isoformat()
|
||||
result["events"] = [{
|
||||
"id": log.id,
|
||||
"timestamp": log.timestamp.isoformat(),
|
||||
"level": log.level,
|
||||
"module": log.module,
|
||||
"message": log.message
|
||||
} for log in logs]
|
||||
|
||||
elif data_type == "http":
|
||||
requests = await APIRequest.filter(timestamp__gt=since).limit(limit).order_by('-timestamp').all()
|
||||
if requests:
|
||||
latest_timestamp = requests[0].timestamp.isoformat()
|
||||
result["events"] = [{
|
||||
"id": req.id,
|
||||
"timestamp": req.timestamp.isoformat(),
|
||||
"method": req.method,
|
||||
"path": req.path,
|
||||
"status_code": req.status_code,
|
||||
"duration": req.response_time
|
||||
} for req in requests]
|
||||
|
||||
elif data_type == "outbound":
|
||||
requests = await OutboundAPIRequest.filter(timestamp__gt=since).limit(limit).order_by('-timestamp').all()
|
||||
if requests:
|
||||
latest_timestamp = requests[0].timestamp.isoformat()
|
||||
result["events"] = [{
|
||||
"id": req.id,
|
||||
"timestamp": req.timestamp.isoformat(),
|
||||
"method": req.method,
|
||||
"url": req.url,
|
||||
"status_code": req.status_code,
|
||||
"duration": req.duration * 1000
|
||||
} for req in requests]
|
||||
|
||||
result["latest_timestamp"] = latest_timestamp
|
||||
result["count"] = len(result["events"])
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# ========== 导出端点 ==========
|
||||
|
||||
import csv
|
||||
import io
|
||||
|
||||
|
||||
@router.get("/history/export")
|
||||
async def export_history_data(
|
||||
start_time: str,
|
||||
end_time: str,
|
||||
timezone_offset: int = None,
|
||||
level: str = None,
|
||||
type: str = "logs",
|
||||
format: str = "csv",
|
||||
module: str = None,
|
||||
keyword: str = None,
|
||||
status_code_min: int = None,
|
||||
status_code_max: int = None,
|
||||
duration_min: float = None,
|
||||
duration_max: float = None,
|
||||
client_ip: str = None,
|
||||
method: str = None
|
||||
):
|
||||
"""
|
||||
导出历史查询数据(CSV或JSON格式)
|
||||
|
||||
Args:
|
||||
start_time: 开始时间
|
||||
end_time: 结束时间
|
||||
timezone_offset: 时区偏移
|
||||
level: 日志级别
|
||||
type: 数据类型(http/outbound/logs)
|
||||
format: 导出格式(csv/json)
|
||||
其他过滤参数同查询接口
|
||||
|
||||
Returns:
|
||||
StreamingResponse: 文件流
|
||||
"""
|
||||
# 时区处理
|
||||
if timezone_offset is None or timezone_offset == 0:
|
||||
try:
|
||||
tz_str = str(TIMEZONE).strip()
|
||||
if tz_str.startswith(('+', '-')):
|
||||
tz_hours = float(tz_str)
|
||||
else:
|
||||
tz_hours = float(tz_str)
|
||||
timezone_offset = int(tz_hours * 60)
|
||||
except (ValueError, TypeError):
|
||||
timezone_offset = 480
|
||||
|
||||
utc_start = local_to_utc(start_time, timezone_offset)
|
||||
utc_end = local_to_utc(end_time, timezone_offset)
|
||||
|
||||
filter_params = {
|
||||
"module": module.split(',') if module else None,
|
||||
"keyword": keyword,
|
||||
"status_code_min": status_code_min,
|
||||
"status_code_max": status_code_max,
|
||||
"duration_min": duration_min,
|
||||
"duration_max": duration_max,
|
||||
"client_ip": client_ip,
|
||||
"method": method
|
||||
}
|
||||
|
||||
limit = 10000 # 导出最大条数
|
||||
|
||||
# 查询数据
|
||||
if type == "http":
|
||||
data = await request_storage.get_requests_with_filters(utc_start, utc_end, limit, filter_params)
|
||||
records = [{
|
||||
"时间": req.timestamp.isoformat(),
|
||||
"方法": req.method,
|
||||
"路径": req.path,
|
||||
"状态码": req.status_code,
|
||||
"响应时间(ms)": req.response_time,
|
||||
"客户端IP": req.client_ip or "",
|
||||
"是否慢请求": "是" if req.is_slow else "否",
|
||||
"是否错误": "是" if req.is_error else "否",
|
||||
"错误信息": req.error_message or ""
|
||||
} for req in data]
|
||||
filename_prefix = "http_requests"
|
||||
|
||||
elif type == "outbound":
|
||||
data = await outbound_request_storage.get_requests_with_filters(utc_start, utc_end, limit, filter_params)
|
||||
records = [{
|
||||
"时间": req.timestamp.isoformat(),
|
||||
"方法": req.method,
|
||||
"URL": req.url,
|
||||
"状态码": req.status_code,
|
||||
"响应时间(ms)": req.duration * 1000,
|
||||
"模块": req.module or "",
|
||||
"是否慢请求": "是" if req.is_slow else "否",
|
||||
"是否错误": "是" if req.is_error else "否",
|
||||
"错误信息": req.error_message or ""
|
||||
} for req in data]
|
||||
filename_prefix = "outbound_requests"
|
||||
|
||||
else: # logs
|
||||
data = await system_log_storage.get_logs_with_filters(utc_start, utc_end, level, limit, filter_params)
|
||||
records = [{
|
||||
"时间": log.timestamp.isoformat(),
|
||||
"级别": log.level,
|
||||
"模块": log.module,
|
||||
"函数": log.function,
|
||||
"行号": log.line_number,
|
||||
"消息": log.message
|
||||
} for log in data]
|
||||
filename_prefix = "system_logs"
|
||||
|
||||
if not records:
|
||||
raise HTTPException(status_code=404, detail="无数据可导出")
|
||||
|
||||
# 生成文件名
|
||||
time_str = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
filename = f"{filename_prefix}_{time_str}.{format}"
|
||||
|
||||
# 流式生成响应
|
||||
if format == "csv":
|
||||
async def generate_csv():
|
||||
output = io.StringIO()
|
||||
if records:
|
||||
writer = csv.DictWriter(output, fieldnames=records[0].keys())
|
||||
writer.writeheader()
|
||||
for record in records:
|
||||
writer.writerow(record)
|
||||
yield output.getvalue()
|
||||
|
||||
return StreamingResponse(
|
||||
generate_csv(),
|
||||
media_type="text/csv",
|
||||
headers={"Content-Disposition": f"attachment; filename={filename}"}
|
||||
)
|
||||
|
||||
else: # json
|
||||
async def generate_json():
|
||||
yield json.dumps(records, ensure_ascii=False, indent=2)
|
||||
|
||||
return StreamingResponse(
|
||||
generate_json(),
|
||||
media_type="application/json",
|
||||
headers={"Content-Disposition": f"attachment; filename={filename}"}
|
||||
)
|
||||
|
||||
@@ -146,3 +146,29 @@ class EventMetrics(BaseModel):
|
||||
timestamp: float = Field(description="时间戳")
|
||||
event_stats: Dict[str, EventTypeStats] = Field(default_factory=dict, description="各事件类型统计")
|
||||
summary: Dict[str, Any] = Field(default_factory=dict, description="汇总信息")
|
||||
|
||||
|
||||
class PaginationMeta(BaseModel):
|
||||
"""分页元数据"""
|
||||
page: int = Field(description="当前页码(从1开始)")
|
||||
page_size: int = Field(description="每页条数")
|
||||
total_count: int = Field(description="总记录数")
|
||||
total_pages: int = Field(description="总页数")
|
||||
has_next: bool = Field(description="是否有下一页")
|
||||
has_prev: bool = Field(description="是否有上一页")
|
||||
start_index: int = Field(description="当前页起始索引")
|
||||
end_index: int = Field(description="当前页结束索引")
|
||||
|
||||
|
||||
class QueryResponse(BaseModel):
|
||||
"""查询响应基类(包含分页)"""
|
||||
pagination: Optional[PaginationMeta] = Field(None, description="分页元数据")
|
||||
time_range: Optional[Dict[str, Any]] = Field(None, description="时间范围")
|
||||
filter_params: Optional[Dict[str, Any]] = Field(None, description="过滤条件")
|
||||
|
||||
|
||||
class HistoryQueryResponse(QueryResponse):
|
||||
"""历史查询响应"""
|
||||
http_requests: List[Dict[str, Any]] = Field(default_factory=list, description="接收请求列表")
|
||||
outbound_requests: List[Dict[str, Any]] = Field(default_factory=list, description="发送请求列表")
|
||||
logs: List[Dict[str, Any]] = Field(default_factory=list, description="系统日志列表")
|
||||
|
||||
+380
-12
@@ -33,7 +33,7 @@ def should_record_to_db(required_level: str) -> bool:
|
||||
class RequestStorage:
|
||||
"""请求数据存储服务"""
|
||||
|
||||
async def get_requests_by_time_range(self, start_time: datetime, end_time: datetime, limit: int = 1000) -> List[APIRequest]:
|
||||
async def get_requests_by_time_range(self, start_time: datetime = None, end_time: datetime = None, limit: int = 1000) -> List[APIRequest]:
|
||||
"""
|
||||
按时间范围查询请求记录
|
||||
|
||||
@@ -46,14 +46,154 @@ class RequestStorage:
|
||||
请求记录列表
|
||||
"""
|
||||
try:
|
||||
requests = await APIRequest.filter(
|
||||
timestamp__gte=start_time,
|
||||
timestamp__lte=end_time
|
||||
).limit(limit).order_by('-timestamp').all()
|
||||
query = APIRequest.filter()
|
||||
|
||||
if start_time and end_time:
|
||||
query = query.filter(
|
||||
timestamp__gte=start_time,
|
||||
timestamp__lte=end_time
|
||||
)
|
||||
|
||||
requests = await query.limit(limit).order_by('-timestamp').all()
|
||||
return requests
|
||||
except Exception as e:
|
||||
print(f"按时间范围获取请求数据失败: {e}")
|
||||
return []
|
||||
|
||||
async def get_requests_with_filters(
|
||||
self,
|
||||
start_time: datetime = None,
|
||||
end_time: datetime = None,
|
||||
limit: int = 1000,
|
||||
filter_params: Dict[str, Any] = None
|
||||
) -> List[APIRequest]:
|
||||
"""
|
||||
按时间范围和多维度过滤条件查询请求记录(支持分页)
|
||||
|
||||
Args:
|
||||
start_time: 开始时间(UTC datetime)
|
||||
end_time: 结束时间(UTC datetime)
|
||||
limit: 返回数量限制(无分页时使用)
|
||||
filter_params: 过滤条件字典,包含:
|
||||
- module: 模块名列表(对API请求不适用,忽略)
|
||||
- keyword: 关键词搜索(匹配path)
|
||||
- status_code_min/max: 状态码范围
|
||||
- duration_min/max: 响应时间范围(毫秒)
|
||||
- client_ip: 客户端IP
|
||||
- method: HTTP方法
|
||||
- page: 页码(从1开始)
|
||||
- page_size: 每页条数
|
||||
- sort_by: 排序字段
|
||||
- sort_order: 排序方向
|
||||
|
||||
Returns:
|
||||
请求记录列表
|
||||
"""
|
||||
try:
|
||||
query = APIRequest.filter()
|
||||
|
||||
if start_time and end_time:
|
||||
query = query.filter(
|
||||
timestamp__gte=start_time,
|
||||
timestamp__lte=end_time
|
||||
)
|
||||
|
||||
if filter_params:
|
||||
# 状态码范围过滤
|
||||
if filter_params.get('status_code_min'):
|
||||
query = query.filter(status_code__gte=filter_params['status_code_min'])
|
||||
if filter_params.get('status_code_max'):
|
||||
query = query.filter(status_code__lte=filter_params['status_code_max'])
|
||||
|
||||
# 响应时间范围过滤(毫秒)
|
||||
if filter_params.get('duration_min'):
|
||||
query = query.filter(response_time__gte=filter_params['duration_min'])
|
||||
if filter_params.get('duration_max'):
|
||||
query = query.filter(response_time__lte=filter_params['duration_max'])
|
||||
|
||||
# 客户端IP过滤
|
||||
if filter_params.get('client_ip'):
|
||||
query = query.filter(client_ip=filter_params['client_ip'])
|
||||
|
||||
# HTTP方法过滤
|
||||
if filter_params.get('method'):
|
||||
query = query.filter(method=filter_params['method'].upper())
|
||||
|
||||
# 关键词搜索(匹配path、request_body、response_body)
|
||||
if filter_params.get('keyword'):
|
||||
keyword = filter_params['keyword']
|
||||
from tortoise.expressions import Q
|
||||
query = query.filter(
|
||||
Q(path__icontains=keyword) |
|
||||
Q(request_body__icontains=keyword) |
|
||||
Q(response_body__icontains=keyword)
|
||||
)
|
||||
|
||||
# 排序
|
||||
sort_by = (filter_params.get('sort_by') if filter_params else None) or 'timestamp'
|
||||
sort_order = (filter_params.get('sort_order') if filter_params else None) or 'desc'
|
||||
|
||||
if sort_by == 'duration':
|
||||
sort_field = 'response_time'
|
||||
elif sort_by == 'status_code':
|
||||
sort_field = 'status_code'
|
||||
else:
|
||||
sort_field = 'timestamp'
|
||||
|
||||
order_str = f"-{sort_field}" if sort_order == 'desc' else sort_field
|
||||
query = query.order_by(order_str)
|
||||
|
||||
# 分页
|
||||
page = filter_params.get('page') if filter_params else None
|
||||
page_size = filter_params.get('page_size') if filter_params else None
|
||||
|
||||
if page and page_size:
|
||||
offset = (page - 1) * page_size
|
||||
requests = await query.offset(offset).limit(page_size).all()
|
||||
else:
|
||||
requests = await query.limit(limit).all()
|
||||
|
||||
return requests
|
||||
except Exception as e:
|
||||
print(f"多条件过滤查询请求数据失败: {e}")
|
||||
return []
|
||||
|
||||
async def count_requests_with_filters(
|
||||
self,
|
||||
start_time: datetime = None,
|
||||
end_time: datetime = None,
|
||||
filter_params: Dict[str, Any] = None
|
||||
) -> int:
|
||||
"""计数查询(用于分页)"""
|
||||
try:
|
||||
query = APIRequest.filter()
|
||||
|
||||
if start_time and end_time:
|
||||
query = query.filter(
|
||||
timestamp__gte=start_time,
|
||||
timestamp__lte=end_time
|
||||
)
|
||||
|
||||
if filter_params:
|
||||
if filter_params.get('status_code_min'):
|
||||
query = query.filter(status_code__gte=filter_params['status_code_min'])
|
||||
if filter_params.get('status_code_max'):
|
||||
query = query.filter(status_code__lte=filter_params['status_code_max'])
|
||||
if filter_params.get('duration_min'):
|
||||
query = query.filter(response_time__gte=filter_params['duration_min'])
|
||||
if filter_params.get('duration_max'):
|
||||
query = query.filter(response_time__lte=filter_params['duration_max'])
|
||||
if filter_params.get('client_ip'):
|
||||
query = query.filter(client_ip=filter_params['client_ip'])
|
||||
if filter_params.get('method'):
|
||||
query = query.filter(method=filter_params['method'].upper())
|
||||
if filter_params.get('keyword'):
|
||||
query = query.filter(path__icontains=filter_params['keyword'])
|
||||
|
||||
return await query.count()
|
||||
except Exception as e:
|
||||
print(f"计数查询失败: {e}")
|
||||
return 0
|
||||
|
||||
async def save_request(self, request_data: Dict[str, Any]) -> APIRequest:
|
||||
"""
|
||||
@@ -172,7 +312,7 @@ class RequestStorage:
|
||||
class OutboundRequestStorage:
|
||||
"""对外请求数据存储服务"""
|
||||
|
||||
async def get_requests_by_time_range(self, start_time: datetime, end_time: datetime, limit: int = 1000) -> List[OutboundAPIRequest]:
|
||||
async def get_requests_by_time_range(self, start_time: datetime = None, end_time: datetime = None, limit: int = 1000) -> List[OutboundAPIRequest]:
|
||||
"""
|
||||
按时间范围查询对外请求记录
|
||||
|
||||
@@ -193,6 +333,125 @@ class OutboundRequestStorage:
|
||||
except Exception as e:
|
||||
print(f"按时间范围获取对外请求数据失败: {e}")
|
||||
return []
|
||||
|
||||
async def get_requests_with_filters(
|
||||
self,
|
||||
start_time: datetime,
|
||||
end_time: datetime,
|
||||
limit: int = 1000,
|
||||
filter_params: Dict[str, Any] = None
|
||||
) -> List[OutboundAPIRequest]:
|
||||
"""
|
||||
按时间范围和多维度过滤条件查询对外请求记录
|
||||
|
||||
Args:
|
||||
start_time: 开始时间(UTC datetime)
|
||||
end_time: 结束时间(UTC datetime)
|
||||
limit: 返回数量限制
|
||||
filter_params: 过滤条件字典
|
||||
|
||||
Returns:
|
||||
对外请求记录列表
|
||||
"""
|
||||
try:
|
||||
query = OutboundAPIRequest.filter(
|
||||
timestamp__gte=start_time,
|
||||
timestamp__lte=end_time
|
||||
)
|
||||
|
||||
if filter_params:
|
||||
# 模块过滤(IN查询)
|
||||
if filter_params.get('module'):
|
||||
query = query.filter(module__in=filter_params['module'])
|
||||
|
||||
# 状态码范围过滤
|
||||
if filter_params.get('status_code_min'):
|
||||
query = query.filter(status_code__gte=filter_params['status_code_min'])
|
||||
if filter_params.get('status_code_max'):
|
||||
query = query.filter(status_code__lte=filter_params['status_code_max'])
|
||||
|
||||
# 响应时间范围过滤(毫秒转秒)
|
||||
if filter_params.get('duration_min'):
|
||||
query = query.filter(duration__gte=filter_params['duration_min'] / 1000)
|
||||
if filter_params.get('duration_max'):
|
||||
query = query.filter(duration__lte=filter_params['duration_max'] / 1000)
|
||||
|
||||
# HTTP方法过滤
|
||||
if filter_params.get('method'):
|
||||
query = query.filter(method=filter_params['method'].upper())
|
||||
|
||||
# 关键词搜索(匹配url、request_body、response_body)
|
||||
if filter_params.get('keyword'):
|
||||
keyword = filter_params['keyword']
|
||||
from tortoise.expressions import Q
|
||||
query = query.filter(
|
||||
Q(url__icontains=keyword) |
|
||||
Q(request_body__icontains=keyword) |
|
||||
Q(response_body__icontains=keyword)
|
||||
)
|
||||
|
||||
# 排序
|
||||
sort_by = filter_params.get('sort_by', 'timestamp') if filter_params else 'timestamp'
|
||||
sort_order = filter_params.get('sort_order', 'desc') if filter_params else 'desc'
|
||||
|
||||
if sort_by == 'duration':
|
||||
sort_field = 'duration'
|
||||
elif sort_by == 'status_code':
|
||||
sort_field = 'status_code'
|
||||
else:
|
||||
sort_field = 'timestamp'
|
||||
|
||||
order_str = f"-{sort_field}" if sort_order == 'desc' else sort_field
|
||||
query = query.order_by(order_str)
|
||||
|
||||
# 分页
|
||||
page = filter_params.get('page') if filter_params else None
|
||||
page_size = filter_params.get('page_size') if filter_params else None
|
||||
|
||||
if page and page_size:
|
||||
offset = (page - 1) * page_size
|
||||
requests = await query.offset(offset).limit(page_size).all()
|
||||
else:
|
||||
requests = await query.limit(limit).all()
|
||||
|
||||
return requests
|
||||
except Exception as e:
|
||||
print(f"多条件过滤查询对外请求数据失败: {e}")
|
||||
return []
|
||||
|
||||
async def count_requests_with_filters(
|
||||
self,
|
||||
start_time: datetime,
|
||||
end_time: datetime,
|
||||
filter_params: Dict[str, Any] = None
|
||||
) -> int:
|
||||
"""计数查询(用于分页)"""
|
||||
try:
|
||||
query = OutboundAPIRequest.filter(
|
||||
timestamp__gte=start_time,
|
||||
timestamp__lte=end_time
|
||||
)
|
||||
|
||||
if filter_params:
|
||||
if filter_params.get('module'):
|
||||
query = query.filter(module__in=filter_params['module'])
|
||||
if filter_params.get('status_code_min'):
|
||||
query = query.filter(status_code__gte=filter_params['status_code_min'])
|
||||
if filter_params.get('status_code_max'):
|
||||
query = query.filter(status_code__lte=filter_params['status_code_max'])
|
||||
if filter_params.get('duration_min'):
|
||||
query = query.filter(duration__gte=filter_params['duration_min'] / 1000)
|
||||
if filter_params.get('duration_max'):
|
||||
query = query.filter(duration__lte=filter_params['duration_max'] / 1000)
|
||||
if filter_params.get('method'):
|
||||
query = query.filter(method=filter_params['method'].upper())
|
||||
if filter_params.get('keyword'):
|
||||
query = query.filter(url__icontains=filter_params['keyword'])
|
||||
|
||||
return await query.count()
|
||||
except Exception as e:
|
||||
print(f"计数查询失败: {e}")
|
||||
return 0
|
||||
|
||||
async def save_request(self, request_data: Dict[str, Any]) -> OutboundAPIRequest:
|
||||
"""
|
||||
@@ -295,7 +554,7 @@ class OutboundRequestStorage:
|
||||
class SystemLogStorage:
|
||||
"""系统日志存储服务"""
|
||||
|
||||
async def get_logs_by_time_range(self, start_time: datetime, end_time: datetime, level: str = None, limit: int = 1000) -> List[SystemLog]:
|
||||
async def get_logs_by_time_range(self, start_time: datetime = None, end_time: datetime = None, level: str = None, limit: int = 1000) -> List[SystemLog]:
|
||||
"""
|
||||
按时间范围查询系统日志记录
|
||||
|
||||
@@ -309,13 +568,20 @@ class SystemLogStorage:
|
||||
系统日志记录列表
|
||||
"""
|
||||
try:
|
||||
query = SystemLog.filter(
|
||||
timestamp__gte=start_time,
|
||||
timestamp__lte=end_time
|
||||
)
|
||||
query = SystemLog.filter()
|
||||
|
||||
if start_time and end_time:
|
||||
query = query.filter(
|
||||
timestamp__gte=start_time,
|
||||
timestamp__lte=end_time
|
||||
)
|
||||
|
||||
if level:
|
||||
query = query.filter(level=level.upper())
|
||||
level_order = {'DEBUG': 10, 'INFO': 20, 'WARNING': 30, 'ERROR': 40, 'CRITICAL': 50}
|
||||
min_level = level_order.get(level.upper(), 0)
|
||||
valid_levels = [lv for lv, num in level_order.items() if num >= min_level]
|
||||
if valid_levels:
|
||||
query = query.filter(level__in=valid_levels)
|
||||
|
||||
logs = await query.limit(limit).order_by('-timestamp').all()
|
||||
return logs
|
||||
@@ -323,6 +589,108 @@ class SystemLogStorage:
|
||||
print(f"按时间范围获取系统日志数据失败: {e}")
|
||||
return []
|
||||
|
||||
async def get_logs_with_filters(
|
||||
self,
|
||||
start_time: datetime,
|
||||
end_time: datetime,
|
||||
level: str = None,
|
||||
limit: int = 1000,
|
||||
filter_params: Dict[str, Any] = None
|
||||
) -> List[SystemLog]:
|
||||
"""
|
||||
按时间范围和多维度过滤条件查询系统日志记录
|
||||
|
||||
Args:
|
||||
start_time: 开始时间(UTC datetime)
|
||||
end_time: 结束时间(UTC datetime)
|
||||
level: 日志级别过滤(可选)
|
||||
limit: 返回数量限制
|
||||
filter_params: 过滤条件字典
|
||||
|
||||
Returns:
|
||||
系统日志记录列表
|
||||
"""
|
||||
try:
|
||||
query = SystemLog.filter()
|
||||
|
||||
if start_time and end_time:
|
||||
query = query.filter(
|
||||
timestamp__gte=start_time,
|
||||
timestamp__lte=end_time
|
||||
)
|
||||
|
||||
if level:
|
||||
level_order = {'DEBUG': 10, 'INFO': 20, 'WARNING': 30, 'ERROR': 40, 'CRITICAL': 50}
|
||||
min_level = level_order.get(level.upper(), 0)
|
||||
valid_levels = [lv for lv, num in level_order.items() if num >= min_level]
|
||||
if valid_levels:
|
||||
query = query.filter(level__in=valid_levels)
|
||||
|
||||
if filter_params:
|
||||
# 模块过滤(IN查询)
|
||||
if filter_params.get('module'):
|
||||
query = query.filter(module__in=filter_params['module'])
|
||||
|
||||
# 关键词搜索(匹配message)
|
||||
if filter_params.get('keyword'):
|
||||
query = query.filter(message__icontains=filter_params['keyword'])
|
||||
|
||||
# 排序
|
||||
sort_by = (filter_params.get('sort_by') if filter_params else None) or 'timestamp'
|
||||
sort_order = (filter_params.get('sort_order') if filter_params else None) or 'desc'
|
||||
order_str = f"-{sort_by}" if sort_order == 'desc' else sort_by
|
||||
query = query.order_by(order_str)
|
||||
|
||||
# 分页
|
||||
page = filter_params.get('page') if filter_params else None
|
||||
page_size = filter_params.get('page_size') if filter_params else None
|
||||
|
||||
if page and page_size:
|
||||
offset = (page - 1) * page_size
|
||||
logs = await query.offset(offset).limit(page_size).all()
|
||||
else:
|
||||
logs = await query.limit(limit).all()
|
||||
|
||||
return logs
|
||||
except Exception as e:
|
||||
print(f"多条件过滤查询系统日志数据失败: {e}")
|
||||
return []
|
||||
|
||||
async def count_logs_with_filters(
|
||||
self,
|
||||
start_time: datetime,
|
||||
end_time: datetime,
|
||||
level: str = None,
|
||||
filter_params: Dict[str, Any] = None
|
||||
) -> int:
|
||||
"""计数查询(用于分页)"""
|
||||
try:
|
||||
query = SystemLog.filter()
|
||||
|
||||
if start_time and end_time:
|
||||
query = query.filter(
|
||||
timestamp__gte=start_time,
|
||||
timestamp__lte=end_time
|
||||
)
|
||||
|
||||
if level:
|
||||
level_order = {'DEBUG': 10, 'INFO': 20, 'WARNING': 30, 'ERROR': 40, 'CRITICAL': 50}
|
||||
min_level = level_order.get(level.upper(), 0)
|
||||
valid_levels = [lv for lv, num in level_order.items() if num >= min_level]
|
||||
if valid_levels:
|
||||
query = query.filter(level__in=valid_levels)
|
||||
|
||||
if filter_params:
|
||||
if filter_params.get('module'):
|
||||
query = query.filter(module__in=filter_params['module'])
|
||||
if filter_params.get('keyword'):
|
||||
query = query.filter(message__icontains=filter_params['keyword'])
|
||||
|
||||
return await query.count()
|
||||
except Exception as e:
|
||||
print(f"计数查询失败: {e}")
|
||||
return 0
|
||||
|
||||
async def clean_old_data(self, days: int = LOG_RETENTION):
|
||||
"""清理指定天数前的系统日志数据"""
|
||||
from datetime import timedelta
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from typing import Dict, Any, List, Tuple
|
||||
from typing import Dict, Any, List, Tuple, Literal
|
||||
import enum
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
@@ -150,7 +150,7 @@ class LoggerConfig(BaseModel):
|
||||
queue_size=get_int('LOG_QUEUE_SIZE', 10000),
|
||||
batch_size=get_int('LOG_BATCH_SIZE', 100),
|
||||
flush_interval=get_float('LOG_FLUSH_INTERVAL', 1.0),
|
||||
stack_trace=get_bool('LOG_STACK_TRACE', False)
|
||||
stack_trace=get_bool('LOG_STACK_TRACE', True)
|
||||
)
|
||||
|
||||
def get_level_int(self) -> int:
|
||||
|
||||
@@ -4,8 +4,10 @@
|
||||
|
||||
| 文件 | 用途 |
|
||||
|------|------|
|
||||
| `migrate_all_in_one.bat` | ✨ **唯一需要运行的脚本** |
|
||||
| `migrate_with_tortoise.py` | Python 脚本(被 bat 调用) |
|
||||
| `migrate_all_in_one.bat` / `.sh` | ✨ **唯一需要运行的脚本** |
|
||||
| `auto_migrate.py` | 自动迁移脚本 |
|
||||
| `migrate_with_tortoise.py` | Tortoise 表创建脚本 |
|
||||
| `add_log_query_indexes.py` | 日志查询索引优化脚本 |
|
||||
| `monitor_orm_config.py` | ORM 配置 |
|
||||
| `README.md` | 本说明文件 |
|
||||
|
||||
@@ -14,8 +16,15 @@
|
||||
## 🚀 使用方法
|
||||
|
||||
**只需要运行一个文件:**
|
||||
|
||||
**Windows:**
|
||||
```
|
||||
scripts/migrate/migrate_all_in_one.bat
|
||||
scripts\migrate\migrate_all_in_one.bat
|
||||
```
|
||||
|
||||
**Linux:**
|
||||
```
|
||||
./scripts/migrate/migrate_all_in_one.sh
|
||||
```
|
||||
|
||||
---
|
||||
@@ -24,9 +33,11 @@ scripts/migrate/migrate_all_in_one.bat
|
||||
|
||||
| 选项 | 功能 | 推荐使用场景 |
|
||||
|------|------|--------------|
|
||||
| **[1]** | 使用 Tortoise 直接生成表 | ✅ **日常新增/更新模型(最可靠)** |
|
||||
| **[2]** | 使用 Aerich 迁移系统 | 需要版本回滚时 |
|
||||
| **[3]** | 重置所有迁移 | aerich 出问题时 |
|
||||
| **[1]** | Auto Migration(自动迁移) | ✅ **日常新增/更新模型(推荐)** |
|
||||
| **[2]** | Create tables with Tortoise | 仅创建新表 |
|
||||
| **[3]** | Reset migrations | 重置所有迁移 |
|
||||
| **[4]** | Add log query indexes | ✅ **优化日志查询性能** |
|
||||
| **[5]** | Backup only | 仅备份数据库 |
|
||||
| **[Q]** | 退出 | - |
|
||||
|
||||
---
|
||||
@@ -35,12 +46,18 @@ scripts/migrate/migrate_all_in_one.bat
|
||||
|
||||
### 新增/更新模型
|
||||
1. 在 `apps/common/monitor/models.py` 中添加/修改模型
|
||||
2. 运行 `migrate_all_in_one.bat`
|
||||
2. 运行迁移脚本
|
||||
3. 选择 **[1]**
|
||||
4. 完成!✅
|
||||
|
||||
### 优化日志查询性能
|
||||
1. 运行迁移脚本
|
||||
2. 选择 **[4]**
|
||||
3. 自动创建索引(client_ip, method, timestamp, level 等)
|
||||
4. 完成!✅
|
||||
|
||||
---
|
||||
|
||||
## 💡 推荐
|
||||
|
||||
**日常开发请优先使用 [1] Tortoise 方案** - 更简单、更可靠!
|
||||
**日常开发请优先使用 [1] Auto Migration** - 自动备份、更简单、更可靠!
|
||||
|
||||
@@ -0,0 +1,241 @@
|
||||
"""
|
||||
数据库索引迁移脚本 - 日志查询功能强化
|
||||
任务ID: T1.1
|
||||
|
||||
为支持日志查询功能强化(高级过滤、分页排序),添加以下索引:
|
||||
1. api_requests: client_ip, method, (timestamp, status_code)
|
||||
2. outbound_api_requests: method, (timestamp, status_code)
|
||||
3. system_logs: (timestamp, level)
|
||||
|
||||
用法:
|
||||
# 执行迁移(创建索引)
|
||||
python scripts/migrate/add_log_query_indexes.py --action migrate
|
||||
|
||||
# 回滚迁移(删除索引)
|
||||
python scripts/migrate/add_log_query_indexes.py --action rollback
|
||||
|
||||
# 检查状态
|
||||
python scripts/migrate/add_log_query_indexes.py --action status
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import argparse
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
project_root = Path(__file__).parent.parent.parent
|
||||
sys.path.insert(0, str(project_root))
|
||||
|
||||
from tortoise import Tortoise
|
||||
from tortoise.backends.sqlite.client import SqliteClient
|
||||
from globalobjects import logger
|
||||
|
||||
INDEX_DEFINITIONS = {
|
||||
"api_requests": {
|
||||
"idx_api_requests_client_ip": ["client_ip"],
|
||||
"idx_api_requests_method": ["method"],
|
||||
"idx_api_requests_timestamp_status": ["timestamp", "status_code"],
|
||||
},
|
||||
"outbound_api_requests": {
|
||||
"idx_outbound_method": ["method"],
|
||||
"idx_outbound_timestamp_status": ["timestamp", "status_code"],
|
||||
},
|
||||
"system_logs": {
|
||||
"idx_logs_timestamp_level": ["timestamp", "level"],
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
async def get_db_client() -> SqliteClient:
|
||||
"""获取数据库客户端"""
|
||||
from core.settings import SQLITE_FILE
|
||||
|
||||
if not Tortoise._inited:
|
||||
await Tortoise.init(
|
||||
db_url=f"sqlite://{SQLITE_FILE}",
|
||||
modules={"models": ["apps.common.monitor.models"]},
|
||||
)
|
||||
|
||||
return Tortoise.get_connection("default")
|
||||
|
||||
|
||||
async def check_table_exists(client: SqliteClient, table: str) -> bool:
|
||||
"""检查表是否存在"""
|
||||
query = f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table}'"
|
||||
result = await client.execute_query(query)
|
||||
return len(result[1]) > 0
|
||||
|
||||
|
||||
async def check_index_exists(client: SqliteClient, table: str, index_name: str) -> bool:
|
||||
"""检查索引是否存在(SQLite)"""
|
||||
if not await check_table_exists(client, table):
|
||||
return False
|
||||
query = f"SELECT name FROM sqlite_master WHERE type='index' AND tbl_name='{table}' AND name='{index_name}'"
|
||||
result = await client.execute_query(query)
|
||||
return len(result[1]) > 0
|
||||
|
||||
|
||||
async def create_index(client: SqliteClient, table: str, index_name: str, columns: list) -> bool:
|
||||
"""创建索引"""
|
||||
try:
|
||||
if not await check_table_exists(client, table):
|
||||
logger.info(f"表不存在,跳过: {table}")
|
||||
return False
|
||||
|
||||
if await check_index_exists(client, table, index_name):
|
||||
logger.info(f"索引已存在,跳过: {table}.{index_name}")
|
||||
return False
|
||||
|
||||
cols = ", ".join(columns)
|
||||
sql = f"CREATE INDEX {index_name} ON {table} ({cols})"
|
||||
await client.execute_query(sql)
|
||||
logger.success(f"创建索引成功", f"{table}.{index_name}", f"({cols})")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"创建索引失败: {table}.{index_name}", exc_info=True)
|
||||
return False
|
||||
|
||||
|
||||
async def drop_index(client: SqliteClient, table: str, index_name: str) -> bool:
|
||||
"""删除索引"""
|
||||
try:
|
||||
if not await check_index_exists(client, table, index_name):
|
||||
logger.info(f"索引不存在,跳过: {table}.{index_name}")
|
||||
return False
|
||||
|
||||
sql = f"DROP INDEX {index_name}"
|
||||
await client.execute_query(sql)
|
||||
logger.success(f"删除索引成功", f"{table}.{index_name}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"删除索引失败: {table}.{index_name}", exc_info=True)
|
||||
return False
|
||||
|
||||
|
||||
async def migrate():
|
||||
"""执行迁移:创建所有索引"""
|
||||
logger.start("执行索引迁移", "创建")
|
||||
|
||||
client = await get_db_client()
|
||||
|
||||
created_count = 0
|
||||
skipped_count = 0
|
||||
failed_count = 0
|
||||
|
||||
for table, indexes in INDEX_DEFINITIONS.items():
|
||||
logger.info(f"处理表: {table}")
|
||||
for index_name, columns in indexes.items():
|
||||
result = await create_index(client, table, index_name, columns)
|
||||
if result:
|
||||
created_count += 1
|
||||
elif await check_index_exists(client, table, index_name):
|
||||
skipped_count += 1
|
||||
else:
|
||||
failed_count += 1
|
||||
|
||||
logger.stop("索引迁移完成", f"创建:{created_count} 跳过:{skipped_count} 失败:{failed_count}")
|
||||
|
||||
if failed_count > 0:
|
||||
logger.error("部分索引创建失败,请检查日志")
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
async def rollback():
|
||||
"""回滚迁移:删除所有索引"""
|
||||
logger.start("执行索引迁移", "回滚")
|
||||
|
||||
client = await get_db_client()
|
||||
|
||||
dropped_count = 0
|
||||
skipped_count = 0
|
||||
failed_count = 0
|
||||
|
||||
for table, indexes in INDEX_DEFINITIONS.items():
|
||||
logger.info(f"处理表: {table}")
|
||||
for index_name in indexes.keys():
|
||||
result = await drop_index(client, table, index_name)
|
||||
if result:
|
||||
dropped_count += 1
|
||||
elif not await check_index_exists(client, table, index_name):
|
||||
skipped_count += 1
|
||||
else:
|
||||
failed_count += 1
|
||||
|
||||
logger.stop("索引回滚完成", f"删除:{dropped_count} 跳过:{skipped_count} 失败:{failed_count}")
|
||||
|
||||
if failed_count > 0:
|
||||
logger.error("部分索引删除失败,请检查日志")
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
async def status():
|
||||
"""检查索引状态"""
|
||||
logger.info("检查索引状态...")
|
||||
|
||||
client = await get_db_client()
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
print("索引状态报告")
|
||||
print("=" * 60)
|
||||
|
||||
total_count = 0
|
||||
existing_count = 0
|
||||
table_not_exists_count = 0
|
||||
|
||||
for table, indexes in INDEX_DEFINITIONS.items():
|
||||
table_exists = await check_table_exists(client, table)
|
||||
print(f"\n表: {table}", "- 存在" if table_exists else "- 不存在")
|
||||
print("-" * 40)
|
||||
|
||||
if not table_exists:
|
||||
table_not_exists_count += len(indexes)
|
||||
for index_name, columns in indexes.items():
|
||||
print(f" {index_name:40} ⊘ 表不存在")
|
||||
continue
|
||||
|
||||
for index_name, columns in indexes.items():
|
||||
exists = await check_index_exists(client, table, index_name)
|
||||
status_str = "✓ 已创建" if exists else "✗ 未创建"
|
||||
print(f" {index_name:40} {status_str}")
|
||||
total_count += 1
|
||||
if exists:
|
||||
existing_count += 1
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
print(f"总计: {existing_count}/{total_count} 个索引已创建, {table_not_exists_count} 个索引待表创建")
|
||||
print("=" * 60 + "\n")
|
||||
|
||||
|
||||
async def main():
|
||||
parser = argparse.ArgumentParser(description="日志查询功能强化 - 索引迁移脚本")
|
||||
parser.add_argument(
|
||||
"--action",
|
||||
choices=["migrate", "rollback", "status"],
|
||||
default="status",
|
||||
help="操作类型: migrate(创建), rollback(回滚), status(检查)"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
if args.action == "migrate":
|
||||
success = await migrate()
|
||||
sys.exit(0 if success else 1)
|
||||
elif args.action == "rollback":
|
||||
success = await rollback()
|
||||
sys.exit(0 if success else 1)
|
||||
elif args.action == "status":
|
||||
await status()
|
||||
sys.exit(0)
|
||||
except Exception as e:
|
||||
logger.exception(f"执行失败: {e}")
|
||||
sys.exit(1)
|
||||
finally:
|
||||
if Tortoise._inited:
|
||||
await Tortoise.close_connections()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -55,6 +55,9 @@ echo.
|
||||
echo [3] Reset migrations
|
||||
echo - Delete all migrations and re-init
|
||||
echo.
|
||||
echo [4] Add log query indexes
|
||||
echo - Optimize log query performance
|
||||
echo.
|
||||
echo [5] Backup only
|
||||
echo - Just backup database
|
||||
echo.
|
||||
@@ -67,6 +70,7 @@ if /i "%choice%"=="" goto :auto_migrate
|
||||
if /i "%choice%"=="1" goto :auto_migrate
|
||||
if /i "%choice%"=="2" goto :tortoise
|
||||
if /i "%choice%"=="3" goto :reset
|
||||
if /i "%choice%"=="4" goto :add_indexes
|
||||
if /i "%choice%"=="5" goto :backup_only
|
||||
if /i "%choice%"=="Q" goto :end
|
||||
if /i "%choice%"=="q" goto :end
|
||||
@@ -191,6 +195,22 @@ echo Migrations reset!
|
||||
echo ========================================
|
||||
goto :end
|
||||
|
||||
:add_indexes
|
||||
echo.
|
||||
echo ========================================
|
||||
echo [4] Add Log Query Indexes
|
||||
echo ========================================
|
||||
call :setup_env
|
||||
if errorlevel 1 goto :end
|
||||
|
||||
echo.
|
||||
echo [INFO] Creating indexes for log query optimization...
|
||||
echo.
|
||||
|
||||
%PYTHON_VENV_DIR%\Scripts\python.exe scripts\migrate\add_log_query_indexes.py --action migrate
|
||||
if errorlevel 1 goto :error
|
||||
goto :success
|
||||
|
||||
:setup_env
|
||||
cd /d "%~dp0\..\.."
|
||||
set "PROJECT_DIR="
|
||||
|
||||
@@ -64,6 +64,9 @@ show_menu() {
|
||||
echo " [3] Reset migrations"
|
||||
echo " - Delete all migrations and re-init"
|
||||
echo ""
|
||||
echo " [4] Add log query indexes"
|
||||
echo " - Optimize log query performance"
|
||||
echo ""
|
||||
echo " [5] Backup only"
|
||||
echo " - Just backup database"
|
||||
echo ""
|
||||
@@ -168,6 +171,20 @@ reset_migrations() {
|
||||
echo "========================================"
|
||||
}
|
||||
|
||||
# 添加日志查询索引
|
||||
add_log_indexes() {
|
||||
echo ""
|
||||
echo "========================================"
|
||||
echo " Add Log Query Indexes"
|
||||
echo "========================================"
|
||||
|
||||
echo ""
|
||||
echo "[INFO] Creating indexes for log query optimization..."
|
||||
echo ""
|
||||
|
||||
$PYTHON_CMD scripts/migrate/add_log_query_indexes.py --action migrate
|
||||
}
|
||||
|
||||
# 主函数
|
||||
main() {
|
||||
local choice="${1:-}"
|
||||
@@ -188,6 +205,9 @@ main() {
|
||||
3)
|
||||
reset_migrations
|
||||
;;
|
||||
4)
|
||||
add_log_indexes
|
||||
;;
|
||||
5)
|
||||
backup_db
|
||||
;;
|
||||
|
||||
@@ -2382,10 +2382,41 @@ td {
|
||||
|
||||
.datetime-input {
|
||||
padding: 6px 12px;
|
||||
border: 1px solid var(--border-color);
|
||||
border-radius: 4px;
|
||||
font-size: 14px;
|
||||
font-family: var(--mono-font);
|
||||
border: 2px solid rgba(255, 255, 255, 0.3);
|
||||
border-radius: 6px;
|
||||
font-size: 13px;
|
||||
font-family: Consolas, Monaco, 'Courier New', monospace;
|
||||
background: rgba(255, 255, 255, 0.95);
|
||||
color: #333;
|
||||
transition: all 0.2s;
|
||||
}
|
||||
|
||||
.datetime-input:focus {
|
||||
outline: none;
|
||||
border-color: rgba(255, 255, 255, 0.6);
|
||||
box-shadow: 0 0 0 3px rgba(255, 255, 255, 0.1);
|
||||
}
|
||||
|
||||
.datetime-input::-webkit-datetime-edit {
|
||||
font-family: Consolas, Monaco, 'Courier New', monospace;
|
||||
}
|
||||
|
||||
.datetime-input::-webkit-datetime-edit-fields-wrapper {
|
||||
font-family: Consolas, Monaco, 'Courier New', monospace;
|
||||
}
|
||||
|
||||
.datetime-input::-webkit-datetime-edit-text {
|
||||
font-family: Consolas, Monaco, 'Courier New', monospace;
|
||||
}
|
||||
|
||||
.datetime-input::-webkit-datetime-edit-month-field,
|
||||
.datetime-input::-webkit-datetime-edit-day-field,
|
||||
.datetime-input::-webkit-datetime-edit-year-field,
|
||||
.datetime-input::-webkit-datetime-edit-hour-field,
|
||||
.datetime-input::-webkit-datetime-edit-minute-field,
|
||||
.datetime-input::-webkit-datetime-edit-second-field,
|
||||
.datetime-input::-webkit-datetime-edit-ampm-field {
|
||||
font-family: Consolas, Monaco, 'Courier New', monospace;
|
||||
}
|
||||
|
||||
.query-filters {
|
||||
@@ -2395,11 +2426,24 @@ td {
|
||||
|
||||
.filter-select {
|
||||
padding: 6px 12px;
|
||||
border: 1px solid var(--border-color);
|
||||
border-radius: 4px;
|
||||
font-size: 14px;
|
||||
background: white;
|
||||
border: 2px solid rgba(255, 255, 255, 0.3);
|
||||
border-radius: 6px;
|
||||
font-size: 13px;
|
||||
font-family: Consolas, Monaco, 'Courier New', monospace;
|
||||
background: rgba(255, 255, 255, 0.95);
|
||||
color: #333;
|
||||
cursor: pointer;
|
||||
transition: all 0.2s;
|
||||
}
|
||||
|
||||
.filter-select:focus {
|
||||
outline: none;
|
||||
border-color: rgba(255, 255, 255, 0.6);
|
||||
box-shadow: 0 0 0 3px rgba(255, 255, 255, 0.1);
|
||||
}
|
||||
|
||||
.filter-select option {
|
||||
font-family: Consolas, Monaco, 'Courier New', monospace;
|
||||
}
|
||||
|
||||
.query-actions {
|
||||
@@ -2469,6 +2513,8 @@ td {
|
||||
|
||||
.result-tab-content {
|
||||
display: none;
|
||||
max-height: calc(100vh - 220px);
|
||||
overflow-y: auto;
|
||||
}
|
||||
|
||||
.result-tab-content.active {
|
||||
|
||||
+1533
-295
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user