mirror of
https://github.com/rnvm9wjdtj-bot/myaps_api.git
synced 2026-06-02 05:54:40 +00:00
bf42299ead
- 新增 globalobjects/logger/ 模块化日志系统 - 支持异步写入、多目标输出、敏感信息脱敏 - 完全向后兼容原有logger API - 备份旧版本为 logger_v1_backup.py 和 logger_v2_backup.py - 更新 .env.example 和 AGENTS.md 文档
2882 lines
101 KiB
Python
2882 lines
101 KiB
Python
import os
|
||
import logging
|
||
import queue
|
||
import time
|
||
import sys
|
||
import platform
|
||
import threading
|
||
from typing import Optional, Dict, Any, List
|
||
from logging.handlers import TimedRotatingFileHandler, QueueHandler, QueueListener
|
||
|
||
|
||
# 日志流处理器列表 - 用于存储外部注册的日志流处理器
|
||
_log_stream_handlers: List[logging.Handler] = []
|
||
|
||
|
||
class EmojiManager:
|
||
"""
|
||
emoji 管理类,根据终端支持情况提供相应的图标"""
|
||
|
||
def __init__(self):
|
||
self._supported = self.is_emoji_supported()
|
||
self._emojis = {
|
||
'SUCCESS': '✅' if self._supported else '[OK]',
|
||
'FAIL': '❌' if self._supported else '[FAIL]',
|
||
'ERROR': '🚫' if self._supported else '[ERROR]',
|
||
'WARNING': '⚠️' if self._supported else '[WARN]',
|
||
'CRITICAL': '💥' if self._supported else '[CRIT]',
|
||
'START': '⏰' if self._supported else '[START]',
|
||
'STOP': '🛑' if self._supported else '[STOP]',
|
||
'INSERT': '📥' if self._supported else '[INSERT]',
|
||
'UPDATE': '🔄' if self._supported else '[UPDATE]',
|
||
'DELETE': '🗑️' if self._supported else '[DELETE]',
|
||
'QUERY': '🔍' if self._supported else '[QUERY]',
|
||
'CONNECT': '🔗' if self._supported else '[CONNECT]',
|
||
'DISCONNECT': '🔌' if self._supported else '[DISCONNECT]',
|
||
'CACHE': '💾' if self._supported else '[CACHE]',
|
||
'TIMER': '⏱️' if self._supported else '[TIMER]',
|
||
'SYNC': '🔄' if self._supported else '[SYNC]',
|
||
'DEBUG': '🔍' if self._supported else '[DEBUG]',
|
||
'INFO': 'ℹ️' if self._supported else '[INFO]'
|
||
}
|
||
|
||
@property
|
||
def supported(self) -> bool:
|
||
"""是否支持 emoji"""
|
||
return self._supported
|
||
|
||
def get(self, name: str) -> str:
|
||
"""
|
||
获取指定名称的 emoji 或替代文本
|
||
|
||
Args:
|
||
name: emoji 名称
|
||
|
||
Returns:
|
||
str: emoji 或替代文本
|
||
"""
|
||
return self._emojis.get(name.upper(), '')
|
||
|
||
@property
|
||
def SUCCESS(self) -> str:
|
||
return self.get('SUCCESS')
|
||
|
||
@property
|
||
def FAIL(self) -> str:
|
||
return self.get('FAIL')
|
||
|
||
@property
|
||
def ERROR(self) -> str:
|
||
return self.get('ERROR')
|
||
|
||
@property
|
||
def WARNING(self) -> str:
|
||
return self.get('WARNING')
|
||
|
||
@property
|
||
def CRITICAL(self) -> str:
|
||
return self.get('CRITICAL')
|
||
|
||
@property
|
||
def START(self) -> str:
|
||
return self.get('START')
|
||
|
||
@property
|
||
def STOP(self) -> str:
|
||
return self.get('STOP')
|
||
|
||
@property
|
||
def INSERT(self) -> str:
|
||
return self.get('INSERT')
|
||
|
||
@property
|
||
def UPDATE(self) -> str:
|
||
return self.get('UPDATE')
|
||
|
||
@property
|
||
def DELETE(self) -> str:
|
||
return self.get('DELETE')
|
||
|
||
@property
|
||
def QUERY(self) -> str:
|
||
return self.get('QUERY')
|
||
|
||
@property
|
||
def CONNECT(self) -> str:
|
||
return self.get('CONNECT')
|
||
|
||
@property
|
||
def DISCONNECT(self) -> str:
|
||
return self.get('DISCONNECT')
|
||
|
||
@property
|
||
def CACHE(self) -> str:
|
||
return self.get('CACHE')
|
||
|
||
@property
|
||
def TIMER(self) -> str:
|
||
return self.get('TIMER')
|
||
|
||
@property
|
||
def SYNC(self) -> str:
|
||
return self.get('SYNC')
|
||
|
||
@property
|
||
def DEBUG(self) -> str:
|
||
return self.get('DEBUG')
|
||
|
||
@property
|
||
def INFO(self) -> str:
|
||
return self.get('INFO')
|
||
|
||
@staticmethod
|
||
def is_emoji_supported() -> bool:
|
||
"""
|
||
检测当前终端是否支持 emoji 显示
|
||
|
||
Returns:
|
||
bool: 如果终端支持 emoji 返回 True,否则返回 False
|
||
"""
|
||
# 检查操作系统
|
||
if platform.system() != 'Windows':
|
||
# 非 Windows 系统通常支持 emoji
|
||
return True
|
||
|
||
# Windows 系统检查
|
||
windows_version = platform.version()
|
||
try:
|
||
parts = windows_version.split('.')
|
||
if len(parts) >= 3:
|
||
major = int(parts[0])
|
||
build = int(parts[2])
|
||
|
||
# Windows 10 1809 (build 17763) 及以上版本支持 emoji
|
||
# Windows 11 虽然内核版本是 10.0,但 build 版本更高
|
||
if major >= 10 and build >= 17763:
|
||
# 检查是否为 Windows Terminal 或支持 emoji 的终端
|
||
terminal = os.environ.get('TERM', '')
|
||
console_host = os.environ.get('CONSOLE_HOST', '')
|
||
|
||
# Windows Terminal
|
||
if 'WT_SESSION' in os.environ:
|
||
return True
|
||
|
||
# VS Code 终端
|
||
if 'VSCODE_INTEGRATED_TERMINAL' in os.environ:
|
||
return True
|
||
|
||
# ConEmu、Cmder 等增强终端
|
||
if any(term in terminal for term in ['conemu', 'cmder', 'mintty']):
|
||
return True
|
||
|
||
# 检查 PowerShell 版本
|
||
try:
|
||
import subprocess
|
||
result = subprocess.run(
|
||
['powershell', '-Command', '$PSVersionTable.PSVersion.Major'],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=5
|
||
)
|
||
if result.returncode == 0:
|
||
ps_version = int(result.stdout.strip())
|
||
# PowerShell 7+ 更好地支持 emoji
|
||
if ps_version >= 7:
|
||
return True
|
||
except:
|
||
pass
|
||
|
||
# 对于 Windows 10 1809+ 或 Windows 11,默认支持 emoji
|
||
# 即使不是增强终端,现代 Windows 也支持基本 emoji
|
||
return True
|
||
except:
|
||
pass
|
||
|
||
# 其他情况默认不支持
|
||
return False
|
||
|
||
# 创建全局实例
|
||
emoji_manager = EmojiManager()
|
||
|
||
|
||
# 检测终端是否支持ANSI颜色
|
||
def is_terminal_supports_ansi():
|
||
"""
|
||
检测终端是否支持ANSI颜色
|
||
"""
|
||
# 检查是否在Windows系统上
|
||
if platform.system() == 'Windows':
|
||
# 在Windows上,检查是否是现代终端
|
||
import ctypes
|
||
try:
|
||
# 获取控制台句柄
|
||
hConsole = ctypes.windll.kernel32.GetStdHandle(-11) # STD_OUTPUT_HANDLE
|
||
# 检查是否支持虚拟终端处理
|
||
mode = ctypes.c_ulong()
|
||
if ctypes.windll.kernel32.GetConsoleMode(hConsole, ctypes.byref(mode)):
|
||
# 启用虚拟终端处理
|
||
new_mode = mode.value | 0x0004 # ENABLE_VIRTUAL_TERMINAL_PROCESSING
|
||
if ctypes.windll.kernel32.SetConsoleMode(hConsole, new_mode):
|
||
return True
|
||
except Exception:
|
||
pass
|
||
# 回退到不支持
|
||
return False
|
||
else:
|
||
# 在非Windows系统上,检查是否连接到终端
|
||
return sys.stdout.isatty()
|
||
|
||
# 全局变量
|
||
TERMINAL_SUPPORTS_ANSI = is_terminal_supports_ansi()
|
||
|
||
# 尝试导入ctypes并获取控制台句柄(仅在Windows系统上)
|
||
import ctypes
|
||
|
||
# 初始化默认值
|
||
hConsole = None
|
||
original_color = 0
|
||
LEVEL_COLORS = {}
|
||
SUPPORT_WINDOWS_API = False
|
||
|
||
if platform.system() == 'Windows':
|
||
try:
|
||
# 获取控制台句柄
|
||
hConsole = ctypes.windll.kernel32.GetStdHandle(-11) # STD_OUTPUT_HANDLE
|
||
|
||
# 定义CONSOLE_SCREEN_BUFFER_INFO结构
|
||
class CONSOLE_SCREEN_BUFFER_INFO(ctypes.Structure):
|
||
_fields_ = [
|
||
("dwSize", ctypes.c_ulong),
|
||
("dwCursorPosition", ctypes.c_ulong * 2),
|
||
("wAttributes", ctypes.c_ushort),
|
||
("srWindow", ctypes.c_ulong * 4),
|
||
("dwMaximumWindowSize", ctypes.c_ulong * 2),
|
||
]
|
||
|
||
# 保存当前颜色
|
||
csbi = CONSOLE_SCREEN_BUFFER_INFO()
|
||
ctypes.windll.kernel32.GetConsoleScreenBufferInfo(hConsole, ctypes.byref(csbi))
|
||
original_color = csbi.wAttributes
|
||
|
||
# Windows控制台颜色常量
|
||
FOREGROUND_BLUE = 0x0001
|
||
FOREGROUND_GREEN = 0x0002
|
||
FOREGROUND_RED = 0x0004
|
||
FOREGROUND_INTENSITY = 0x0008
|
||
|
||
# 颜色映射
|
||
LEVEL_COLORS = {
|
||
'DEBUG': FOREGROUND_BLUE | FOREGROUND_GREEN | FOREGROUND_INTENSITY, # 青色
|
||
'INFO': FOREGROUND_GREEN | FOREGROUND_INTENSITY, # 绿色
|
||
'WARNING': FOREGROUND_RED | FOREGROUND_GREEN | FOREGROUND_INTENSITY, # 黄色
|
||
'ERROR': FOREGROUND_RED | FOREGROUND_INTENSITY, # 红色
|
||
'CRITICAL': FOREGROUND_RED | FOREGROUND_INTENSITY, # 红色
|
||
}
|
||
|
||
# 是否支持Windows API
|
||
SUPPORT_WINDOWS_API = True
|
||
except Exception as e:
|
||
# 如果出错,设置为不支持
|
||
SUPPORT_WINDOWS_API = False
|
||
hConsole = None
|
||
original_color = 0
|
||
LEVEL_COLORS = {}
|
||
print(f"获取控制台句柄失败: {e}")
|
||
|
||
# ANSI颜色代码
|
||
ANSI_COLORS = {
|
||
'DEBUG': '\033[36m', # 青色
|
||
'INFO': '\033[32m', # 绿色
|
||
'WARNING': '\033[33m', # 黄色
|
||
'ERROR': '\033[31m', # 红色
|
||
'CRITICAL': '\033[31m', # 红色
|
||
'RESET': '\033[0m', # 重置
|
||
}
|
||
|
||
# 全局日志配置
|
||
LOG_LEVELS = {
|
||
'DEBUG': logging.DEBUG,
|
||
'INFO': logging.INFO,
|
||
'WARNING': logging.WARNING,
|
||
'ERROR': logging.ERROR,
|
||
'CRITICAL': logging.CRITICAL
|
||
}
|
||
|
||
# 默认日志格式
|
||
DEFAULT_LOG_FORMAT = '%(asctime)s - %(name)s - %(funcName)s:%(lineno)d - %(levelname)s - %(message)s'
|
||
|
||
# 结构化日志格式(JSON)
|
||
JSON_LOG_FORMAT = '''
|
||
{
|
||
"timestamp": "%(asctime)s",
|
||
"module": "%(name)s",
|
||
"function": "%(funcName)s",
|
||
"line": %(lineno)d,
|
||
"level": "%(levelname)s",
|
||
"message": "%(message)s"
|
||
}
|
||
'''
|
||
|
||
# 日志文件配置
|
||
LOG_CONFIG = {
|
||
'default': {
|
||
'filename': 'app.log',
|
||
'level': 'INFO'
|
||
},
|
||
'error': {
|
||
'filename': 'error.log',
|
||
'level': 'ERROR'
|
||
},
|
||
'debug': {
|
||
'filename': 'debug.log',
|
||
'level': 'DEBUG'
|
||
}
|
||
}
|
||
|
||
|
||
class LogHelper:
|
||
"""
|
||
统一日志格式化工具类
|
||
|
||
提供标准化的日志消息格式,确保项目中所有日志输出风格一致。
|
||
|
||
使用示例:
|
||
>>> console_log.info(LogHelper.success("推送采购申请", "单号PR001", "共5条"))
|
||
>>> console_log.error(LogHelper.error("查询PL", "PL001", "网络超时"))
|
||
>>> console_log.info(LogHelper.start("同步任务", "账套A01"))
|
||
"""
|
||
|
||
|
||
class Emoji:
|
||
SUCCESS = emoji_manager.SUCCESS
|
||
FAIL = emoji_manager.FAIL
|
||
ERROR = emoji_manager.ERROR
|
||
WARNING = emoji_manager.WARNING
|
||
CRITICAL = emoji_manager.CRITICAL
|
||
START = emoji_manager.START
|
||
STOP = emoji_manager.STOP
|
||
|
||
INSERT = emoji_manager.INSERT
|
||
UPDATE = emoji_manager.UPDATE
|
||
DELETE = emoji_manager.DELETE
|
||
QUERY = emoji_manager.QUERY
|
||
|
||
CONNECT = emoji_manager.CONNECT
|
||
DISCONNECT = emoji_manager.DISCONNECT
|
||
CACHE = emoji_manager.CACHE
|
||
TIMER = emoji_manager.TIMER
|
||
SYNC = emoji_manager.SYNC
|
||
|
||
DEBUG = emoji_manager.DEBUG
|
||
INFO = emoji_manager.INFO
|
||
|
||
class Template:
|
||
SUCCESS = "{emoji} {action}成功:{subject}"
|
||
SUCCESS_WITH_DETAILS = "{emoji} {action}成功:{subject},{details}"
|
||
|
||
FAIL = "{emoji} {action}失败:{subject}"
|
||
FAIL_WITH_REASON = "{emoji} {action}失败:{subject} - {reason}"
|
||
|
||
START = "{emoji} 开始{action}:{subject}"
|
||
STOP = "{emoji} 结束{action}:{subject}"
|
||
|
||
STATUS_CHANGE = "{emoji} {subject}状态变更:{old_status} -> {new_status}"
|
||
|
||
API_RESPONSE = "{emoji} {api_name}响应:{status_code}"
|
||
API_RESPONSE_WITH_DATA = "{emoji} {api_name}响应:{status_code},{details}"
|
||
|
||
QUERY_RESULT = "{emoji} 查询{target}:{result}"
|
||
QUERY_RESULT_WITH_COUNT = "{emoji} 查询{target}成功:共{count}条数据"
|
||
|
||
DATA_INSERT = "{emoji} 插入{target}:{subject}"
|
||
DATA_INSERT_WITH_COUNT = "{emoji} 插入{target}成功:共{count}条数据"
|
||
|
||
DATA_UPDATE = "{emoji} 更新{target}:{subject}"
|
||
DATA_UPDATE_WITH_COUNT = "{emoji} 更新{target}成功:共{count}条数据"
|
||
|
||
DATA_DELETE = "{emoji} 删除{target}:{subject}"
|
||
DATA_DELETE_WITH_COUNT = "{emoji} 删除{target}成功:共{count}条数据"
|
||
|
||
WARNING = "{emoji} {subject}:{message}"
|
||
ERROR = "{emoji} {subject}:{message}"
|
||
|
||
@staticmethod
|
||
def success(action: str, subject: str, details: str = "") -> str:
|
||
"""
|
||
格式化成功消息
|
||
|
||
Args:
|
||
action: 操作名称,如"推送采购申请"、"同步库存"
|
||
subject: 操作主体,如"单号PR001"、"账套A01"
|
||
details: 额外详情,如"共5条"、"耗时10秒"
|
||
|
||
Returns:
|
||
格式化后的日志消息
|
||
|
||
Example:
|
||
>>> LogHelper.success("推送采购申请", "单号PR001", "共5条")
|
||
'✅ 推送采购申请成功:单号PR001,共5条'
|
||
"""
|
||
if details:
|
||
return LogHelper.Template.SUCCESS_WITH_DETAILS.format(
|
||
emoji=LogHelper.Emoji.SUCCESS,
|
||
action=action,
|
||
subject=subject,
|
||
details=details
|
||
)
|
||
return LogHelper.Template.SUCCESS.format(
|
||
emoji=LogHelper.Emoji.SUCCESS,
|
||
action=action,
|
||
subject=subject
|
||
)
|
||
|
||
@staticmethod
|
||
def fail(action: str, subject: str, reason: str = "") -> str:
|
||
"""
|
||
格式化失败消息
|
||
|
||
Args:
|
||
action: 操作名称
|
||
subject: 操作主体
|
||
reason: 失败原因
|
||
|
||
Returns:
|
||
格式化后的日志消息
|
||
|
||
Example:
|
||
>>> LogHelper.fail("推送采购申请", "单号PR001", "网络超时")
|
||
'❌ 推送采购申请失败:单号PR001 - 网络超时'
|
||
"""
|
||
if reason:
|
||
return LogHelper.Template.FAIL_WITH_REASON.format(
|
||
emoji=LogHelper.Emoji.FAIL,
|
||
action=action,
|
||
subject=subject,
|
||
reason=reason
|
||
)
|
||
return LogHelper.Template.FAIL.format(
|
||
emoji=LogHelper.Emoji.FAIL,
|
||
action=action,
|
||
subject=subject
|
||
)
|
||
|
||
@staticmethod
|
||
def error(action: str, subject: str, reason: str = "") -> str:
|
||
"""
|
||
格式化错误消息(与fail类似,使用不同的emoji)
|
||
|
||
Args:
|
||
action: 操作名称
|
||
subject: 操作主体
|
||
reason: 错误原因
|
||
|
||
Returns:
|
||
格式化后的日志消息
|
||
"""
|
||
if reason:
|
||
return f"{LogHelper.Emoji.ERROR} {action}失败:{subject} - {reason}"
|
||
return f"{LogHelper.Emoji.ERROR} {action}失败:{subject}"
|
||
|
||
@staticmethod
|
||
def start(action: str, subject: str = "") -> str:
|
||
"""
|
||
格式化开始消息
|
||
|
||
Args:
|
||
action: 操作名称
|
||
subject: 操作主体(可选)
|
||
|
||
Returns:
|
||
格式化后的日志消息
|
||
|
||
Example:
|
||
>>> LogHelper.start("同步任务", "账套A01")
|
||
'⏰ 开始同步任务:账套A01'
|
||
"""
|
||
if subject:
|
||
return LogHelper.Template.START.format(
|
||
emoji=LogHelper.Emoji.START,
|
||
action=action,
|
||
subject=subject
|
||
)
|
||
return f"{LogHelper.Emoji.START} 开始{action}"
|
||
|
||
@staticmethod
|
||
def stop(action: str, subject: str = "") -> str:
|
||
"""
|
||
格式化结束消息
|
||
|
||
Args:
|
||
action: 操作名称
|
||
subject: 操作主体(可选)
|
||
|
||
Returns:
|
||
格式化后的日志消息
|
||
"""
|
||
if subject:
|
||
return LogHelper.Template.STOP.format(
|
||
emoji=LogHelper.Emoji.STOP,
|
||
action=action,
|
||
subject=subject
|
||
)
|
||
return f"{LogHelper.Emoji.STOP} 结束{action}"
|
||
|
||
@staticmethod
|
||
def status_change(subject: str, old_status: str, new_status: str) -> str:
|
||
"""
|
||
格式化状态变更消息
|
||
|
||
Args:
|
||
subject: 操作主体
|
||
old_status: 旧状态
|
||
new_status: 新状态
|
||
|
||
Returns:
|
||
格式化后的日志消息
|
||
|
||
Example:
|
||
>>> LogHelper.status_change("PL001", "待处理", "已确认")
|
||
'🔄 PL001状态变更:待处理 -> 已确认'
|
||
"""
|
||
return LogHelper.Template.STATUS_CHANGE.format(
|
||
emoji=LogHelper.Emoji.UPDATE,
|
||
subject=subject,
|
||
old_status=old_status,
|
||
new_status=new_status
|
||
)
|
||
|
||
@staticmethod
|
||
def api_response(api_name: str, status_code: int, details: str = "") -> str:
|
||
"""
|
||
格式化API响应消息
|
||
|
||
Args:
|
||
api_name: API名称
|
||
status_code: HTTP状态码
|
||
details: 额外详情(可选)
|
||
|
||
Returns:
|
||
格式化后的日志消息
|
||
|
||
Example:
|
||
>>> LogHelper.api_response("更新PL状态", 200)
|
||
'✅ 更新PL状态响应:200'
|
||
"""
|
||
emoji = LogHelper.Emoji.SUCCESS if 200 <= status_code < 300 else LogHelper.Emoji.FAIL
|
||
if details:
|
||
return LogHelper.Template.API_RESPONSE_WITH_DATA.format(
|
||
emoji=emoji,
|
||
api_name=api_name,
|
||
status_code=status_code,
|
||
details=details
|
||
)
|
||
return LogHelper.Template.API_RESPONSE.format(
|
||
emoji=emoji,
|
||
api_name=api_name,
|
||
status_code=status_code
|
||
)
|
||
|
||
@staticmethod
|
||
def query(target: str, result: str = "", count: int = None) -> str:
|
||
"""
|
||
格式化查询消息
|
||
|
||
Args:
|
||
target: 查询目标
|
||
result: 查询结果描述
|
||
count: 结果数量(可选)
|
||
|
||
Returns:
|
||
格式化后的日志消息
|
||
|
||
Example:
|
||
>>> LogHelper.query("PL信息", count=10)
|
||
'✅ 查询PL信息成功:共10条'
|
||
"""
|
||
if count is not None:
|
||
return LogHelper.Template.QUERY_RESULT_WITH_COUNT.format(
|
||
emoji=LogHelper.Emoji.SUCCESS,
|
||
target=target,
|
||
count=count
|
||
)
|
||
if result:
|
||
return LogHelper.Template.QUERY_RESULT.format(
|
||
emoji=LogHelper.Emoji.QUERY,
|
||
target=target,
|
||
result=result
|
||
)
|
||
return f"{LogHelper.Emoji.QUERY} 开始查询{target}"
|
||
|
||
@staticmethod
|
||
def insert(target: str, subject: str = "", count: int = None) -> str:
|
||
"""
|
||
格式化插入消息
|
||
|
||
Args:
|
||
target: 插入目标表/集合
|
||
subject: 插入主体(可选)
|
||
count: 插入数量(可选)
|
||
|
||
Returns:
|
||
格式化后的日志消息
|
||
"""
|
||
if count is not None:
|
||
return LogHelper.Template.DATA_INSERT_WITH_COUNT.format(
|
||
emoji=LogHelper.Emoji.INSERT,
|
||
target=target,
|
||
count=count
|
||
)
|
||
if subject:
|
||
return LogHelper.Template.DATA_INSERT.format(
|
||
emoji=LogHelper.Emoji.INSERT,
|
||
target=target,
|
||
subject=subject
|
||
)
|
||
return f"{LogHelper.Emoji.INSERT} 插入{target}"
|
||
|
||
@staticmethod
|
||
def update(target: str, subject: str = "", count: int = None) -> str:
|
||
"""
|
||
格式化更新消息
|
||
|
||
Args:
|
||
target: 更新目标表/集合
|
||
subject: 更新主体(可选)
|
||
count: 更新数量(可选)
|
||
|
||
Returns:
|
||
格式化后的日志消息
|
||
"""
|
||
if count is not None:
|
||
return LogHelper.Template.DATA_UPDATE_WITH_COUNT.format(
|
||
emoji=LogHelper.Emoji.UPDATE,
|
||
target=target,
|
||
count=count
|
||
)
|
||
if subject:
|
||
return LogHelper.Template.DATA_UPDATE.format(
|
||
emoji=LogHelper.Emoji.UPDATE,
|
||
target=target,
|
||
subject=subject
|
||
)
|
||
return f"{LogHelper.Emoji.UPDATE} 更新{target}"
|
||
|
||
@staticmethod
|
||
def delete(target: str, subject: str = "", count: int = None) -> str:
|
||
"""
|
||
格式化删除消息
|
||
|
||
Args:
|
||
target: 删除目标表/集合
|
||
subject: 删除主体(可选)
|
||
count: 删除数量(可选)
|
||
|
||
Returns:
|
||
格式化后的日志消息
|
||
"""
|
||
if count is not None:
|
||
return LogHelper.Template.DATA_DELETE_WITH_COUNT.format(
|
||
emoji=LogHelper.Emoji.DELETE,
|
||
target=target,
|
||
count=count
|
||
)
|
||
if subject:
|
||
return LogHelper.Template.DATA_DELETE.format(
|
||
emoji=LogHelper.Emoji.DELETE,
|
||
target=target,
|
||
subject=subject
|
||
)
|
||
return f"{LogHelper.Emoji.DELETE} 删除{target}"
|
||
|
||
@staticmethod
|
||
def warning(subject: str, message: str) -> str:
|
||
"""
|
||
格式化警告消息
|
||
|
||
Args:
|
||
subject: 警告主体
|
||
message: 警告信息
|
||
|
||
Returns:
|
||
格式化后的日志消息
|
||
"""
|
||
return LogHelper.Template.WARNING.format(
|
||
emoji=LogHelper.Emoji.WARNING,
|
||
subject=subject,
|
||
message=message
|
||
)
|
||
|
||
@staticmethod
|
||
def sync(action: str, subject: str = "", details: str = "") -> str:
|
||
"""
|
||
格式化同步消息
|
||
|
||
Args:
|
||
action: 同步操作名称
|
||
subject: 同步主体
|
||
details: 额外详情
|
||
|
||
Returns:
|
||
格式化后的日志消息
|
||
"""
|
||
if details:
|
||
return f"{LogHelper.Emoji.SYNC} {action}:{subject},{details}"
|
||
if subject:
|
||
return f"{LogHelper.Emoji.SYNC} {action}:{subject}"
|
||
return f"{LogHelper.Emoji.SYNC} {action}"
|
||
|
||
@staticmethod
|
||
def connect(target: str, status: str = "成功") -> str:
|
||
"""
|
||
格式化连接消息
|
||
|
||
Args:
|
||
target: 连接目标
|
||
status: 连接状态
|
||
|
||
Returns:
|
||
格式化后的日志消息
|
||
"""
|
||
emoji = LogHelper.Emoji.CONNECT if status == "成功" else LogHelper.Emoji.ERROR
|
||
return f"{emoji} 连接{target}{status}"
|
||
|
||
@staticmethod
|
||
def disconnect(target: str) -> str:
|
||
"""
|
||
格式化断开连接消息
|
||
|
||
Args:
|
||
target: 断开目标
|
||
|
||
Returns:
|
||
格式化后的日志消息
|
||
"""
|
||
return f"{LogHelper.Emoji.DISCONNECT} 断开{target}连接"
|
||
|
||
@staticmethod
|
||
def cache(action: str, target: str = "", details: str = "") -> str:
|
||
"""
|
||
格式化缓存消息
|
||
|
||
Args:
|
||
action: 缓存操作(如"刷新"、"清理")
|
||
target: 缓存目标
|
||
details: 额外详情
|
||
|
||
Returns:
|
||
格式化后的日志消息
|
||
"""
|
||
if details:
|
||
return f"{LogHelper.Emoji.CACHE} {action}缓存:{target},{details}"
|
||
if target:
|
||
return f"{LogHelper.Emoji.CACHE} {action}缓存:{target}"
|
||
return f"{LogHelper.Emoji.CACHE} {action}缓存"
|
||
|
||
|
||
# 存储多个logger实例和对应的listener
|
||
logger_instances = {}
|
||
listeners = {}
|
||
db_initialized = False # 全局数据库初始化状态
|
||
|
||
|
||
class DatePrefixRotatingFileHandler(TimedRotatingFileHandler):
|
||
"""
|
||
自定义的按时间轮转的文件处理器
|
||
|
||
特点:
|
||
- app.log 保存最近N天的日志(默认10天)
|
||
- 轮替时,从 app.log 中提取历史日期的日志到单独文件
|
||
- 自动清理超过备份数量的旧日志文件
|
||
"""
|
||
|
||
DEFAULT_RETENTION_DAYS = 10
|
||
|
||
def __init__(self, *args, **kwargs):
|
||
self.retention_days = kwargs.pop('retention_days', self.DEFAULT_RETENTION_DAYS)
|
||
super().__init__(*args, **kwargs)
|
||
self.encoding = kwargs.get('encoding', 'utf-8')
|
||
self.rolloverAt = self.computeRollover(int(time.time()))
|
||
self._rollover_lock = threading.Lock()
|
||
self._last_rollover_date = self._get_today_str()
|
||
|
||
def _get_today_str(self) -> str:
|
||
"""获取今天的日期字符串"""
|
||
return time.strftime("%Y-%m-%d", time.localtime())
|
||
|
||
def _get_retention_dates(self) -> set:
|
||
"""获取需要保留的日期集合(最近N天)"""
|
||
retention_dates = set()
|
||
current_time = time.time()
|
||
for i in range(self.retention_days):
|
||
date_str = time.strftime("%Y-%m-%d", time.localtime(current_time - i * 86400))
|
||
retention_dates.add(date_str)
|
||
return retention_dates
|
||
|
||
def emit(self, record):
|
||
current_time = int(time.time())
|
||
if current_time >= self.rolloverAt:
|
||
if self._rollover_lock.acquire(blocking=False):
|
||
try:
|
||
if current_time >= self.rolloverAt:
|
||
self.doRollover()
|
||
finally:
|
||
self._rollover_lock.release()
|
||
# 直接写入,不调用父类的emit以避免父类的doRollover被调用
|
||
if self.stream is None:
|
||
self.stream = self._open()
|
||
logging.handlers.BaseRotatingHandler.emit(self, record)
|
||
|
||
def doRollover(self):
|
||
"""
|
||
轮替方法:
|
||
1. 从 app.log 中提取历史日期的日志到单独文件
|
||
2. 清理 app.log,仅保留最近N天的日志
|
||
3. 清理超过备份数量的旧文件
|
||
"""
|
||
if self.stream:
|
||
self.stream.close()
|
||
self.stream = None
|
||
|
||
current_time = int(time.time())
|
||
self.rolloverAt = self.computeRollover(current_time)
|
||
|
||
if self.backupCount > 0:
|
||
base_dir, filename = os.path.split(self.baseFilename)
|
||
name_without_ext, ext = os.path.splitext(filename)
|
||
|
||
self._extract_and_clean_logs(base_dir, name_without_ext, ext)
|
||
|
||
self._delete_old_logs(base_dir, name_without_ext, ext)
|
||
|
||
self.mode = 'a'
|
||
self.stream = self._open()
|
||
self._last_rollover_date = self._get_today_str()
|
||
|
||
def _extract_and_clean_logs(self, base_dir: str, name_without_ext: str, ext: str):
|
||
"""
|
||
从 app.log 中提取历史日期的日志到单独文件,并清理 app.log
|
||
|
||
Args:
|
||
base_dir: 日志目录
|
||
name_without_ext: 日志文件名(不含扩展名)
|
||
ext: 日志文件扩展名
|
||
"""
|
||
if not os.path.exists(self.baseFilename):
|
||
return
|
||
|
||
retention_dates = self._get_retention_dates()
|
||
|
||
logs_by_date = {}
|
||
retained_logs = []
|
||
|
||
try:
|
||
with open(self.baseFilename, 'r', encoding=self.encoding, errors='replace') as f:
|
||
for line in f:
|
||
date_str = self._extract_date_from_line(line)
|
||
if date_str:
|
||
if date_str in retention_dates:
|
||
retained_logs.append(line)
|
||
else:
|
||
if date_str not in logs_by_date:
|
||
logs_by_date[date_str] = []
|
||
logs_by_date[date_str].append(line)
|
||
else:
|
||
retained_logs.append(line)
|
||
except Exception:
|
||
return
|
||
|
||
for date_str, lines in logs_by_date.items():
|
||
if not lines:
|
||
continue
|
||
|
||
date_prefix = date_str.replace('-', '')
|
||
new_filename = f"{date_prefix}_{name_without_ext}{ext}"
|
||
new_filepath = os.path.join(base_dir, new_filename)
|
||
|
||
try:
|
||
mode = 'a' if os.path.exists(new_filepath) else 'w'
|
||
with open(new_filepath, mode, encoding=self.encoding) as f:
|
||
f.writelines(lines)
|
||
except Exception:
|
||
pass
|
||
|
||
try:
|
||
with open(self.baseFilename, 'w', encoding=self.encoding) as f:
|
||
f.writelines(retained_logs)
|
||
except Exception:
|
||
pass
|
||
|
||
def _extract_date_from_line(self, line: str) -> str:
|
||
"""
|
||
从日志行中提取日期
|
||
|
||
Args:
|
||
line: 日志行,格式如 "2026-04-05 09:35:01,177 - ..."
|
||
|
||
Returns:
|
||
日期字符串 "YYYY-MM-DD" 或 None
|
||
"""
|
||
if len(line) < 10:
|
||
return None
|
||
|
||
date_part = line[:10]
|
||
if (len(date_part) == 10 and
|
||
date_part[4] == '-' and date_part[7] == '-' and
|
||
date_part[:4].isdigit() and date_part[5:7].isdigit() and date_part[8:10].isdigit()):
|
||
return date_part
|
||
return None
|
||
|
||
def _delete_old_logs(self, base_dir: str, name_without_ext: str, ext: str):
|
||
"""
|
||
删除超过备份数量的旧日志文件
|
||
|
||
Args:
|
||
base_dir: 日志目录
|
||
name_without_ext: 日志文件名(不含扩展名)
|
||
ext: 日志文件扩展名
|
||
"""
|
||
import glob
|
||
|
||
pattern = os.path.join(base_dir, f"????????_{name_without_ext}{ext}")
|
||
log_files = glob.glob(pattern)
|
||
|
||
if len(log_files) <= self.backupCount:
|
||
return
|
||
|
||
def extract_date(filepath: str) -> str:
|
||
filename = os.path.basename(filepath)
|
||
return filename[:8] if len(filename) >= 8 and filename[:8].isdigit() else "00000000"
|
||
|
||
log_files.sort(key=extract_date, reverse=True)
|
||
|
||
for old_file in log_files[self.backupCount:]:
|
||
try:
|
||
if os.path.exists(old_file):
|
||
os.remove(old_file)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
|
||
|
||
|
||
def setup_file_logging(log_name: str, log_filename='app.log') -> logging.Logger:
|
||
"""
|
||
设置文件日志配置
|
||
支持多个不同文件名的logger实例
|
||
|
||
Args:
|
||
log_name: 日志名称
|
||
log_filename: 日志文件名
|
||
|
||
Returns:
|
||
logging.Logger: 配置好的logger实例
|
||
"""
|
||
# 使用log_filename作为key,确保不同文件名有不同的logger
|
||
logger_key = f"{log_name}:{log_filename}"
|
||
|
||
if logger_key in logger_instances:
|
||
return logger_instances[logger_key]
|
||
|
||
logger = logging.getLogger(f"{log_name}_{log_filename}")
|
||
# 防止重复添加处理器
|
||
if logger.handlers:
|
||
logger_instances[logger_key] = logger
|
||
return logger
|
||
|
||
logger.setLevel(logging.DEBUG)
|
||
# 关闭日志传播,防止重复输出
|
||
logger.propagate = False
|
||
formatter = logging.Formatter(DEFAULT_LOG_FORMAT)
|
||
|
||
log_dir = "logs"
|
||
if not os.path.exists(log_dir):
|
||
os.makedirs(log_dir)
|
||
# 创建按时间轮替的 FileHandler(支持日期前缀)
|
||
timed_handler = DatePrefixRotatingFileHandler(
|
||
filename=os.path.join(log_dir, log_filename),
|
||
when='midnight',
|
||
interval=1,
|
||
backupCount=7,
|
||
encoding='utf-8'
|
||
)
|
||
timed_handler.setLevel(logging.DEBUG)
|
||
timed_handler.setFormatter(formatter)
|
||
|
||
# 创建队列和 QueueListener
|
||
log_queue = queue.Queue(-1)
|
||
|
||
# 获取日志流处理器(如果存在)
|
||
stream_handlers = []
|
||
if hasattr(__import__(__name__), '_log_stream_handlers'):
|
||
stream_handlers = getattr(__import__(__name__), '_log_stream_handlers', [])
|
||
|
||
# 创建 QueueListener,包含文件处理器和日志流处理器
|
||
listener = QueueListener(log_queue, timed_handler, *stream_handlers, respect_handler_level=True)
|
||
|
||
# 存储listener
|
||
listeners[logger_key] = listener
|
||
|
||
# 创建 QueueHandler 并添加到 logger
|
||
queue_handler = QueueHandler(log_queue)
|
||
logger.addHandler(queue_handler)
|
||
|
||
# 存储logger实例
|
||
logger_instances[logger_key] = logger
|
||
|
||
# 注意:这里不在这里启动 listener,而是在 lifespan 的启动阶段启动
|
||
return logger
|
||
|
||
|
||
def start_all_listeners():
|
||
"""启动所有存储的listener"""
|
||
for key, listener in listeners.items():
|
||
try:
|
||
listener.start()
|
||
except Exception as e:
|
||
pass
|
||
|
||
|
||
def close_logging():
|
||
"""关闭所有日志系统"""
|
||
# 停止并清理所有listener
|
||
for key, listener in listeners.items():
|
||
try:
|
||
listener.stop()
|
||
except AttributeError:
|
||
# 处理listener未启动的情况
|
||
pass
|
||
listeners.clear()
|
||
|
||
# 清理所有logger实例和其handlers
|
||
for key, logger in logger_instances.items():
|
||
# 移除所有handlers
|
||
for handler in logger.handlers[:]:
|
||
# 关闭handler
|
||
if hasattr(handler, 'close'):
|
||
handler.close()
|
||
# 移除handler
|
||
logger.removeHandler(handler)
|
||
logger_instances.clear()
|
||
|
||
|
||
def get_log_level(level_name: str) -> int:
|
||
"""
|
||
获取日志级别
|
||
|
||
Args:
|
||
level_name: 日志级别名称
|
||
|
||
Returns:
|
||
对应的日志级别数值
|
||
"""
|
||
return LOG_LEVELS.get(level_name.upper(), logging.INFO)
|
||
|
||
|
||
|
||
|
||
|
||
class SmartLogger(logging.Logger):
|
||
"""
|
||
智能日志器类,扩展便捷方法,支持同时输出到控制台和文件
|
||
|
||
使用示例:
|
||
>>> console_log.success("推送采购申请", "单号PR001", "共5条")
|
||
>>> console_log.fail("查询PL", "PL001", "网络超时") # 自动同时写入文件
|
||
>>> console_log.start("同步任务", "账套A01")
|
||
"""
|
||
|
||
_file_logger = None
|
||
_auto_file_enabled = True
|
||
_db_enabled = True
|
||
_db_min_level = logging.INFO # 只记录 INFO 级别及以上的日志到数据库
|
||
_db_initialized = False # 数据库是否已初始化
|
||
|
||
def set_file_logger(self, file_logger) -> None:
|
||
"""
|
||
设置关联的文件日志器
|
||
|
||
Args:
|
||
file_logger: 文件日志器实例
|
||
"""
|
||
self._file_logger = file_logger
|
||
|
||
def enable_auto_file(self) -> None:
|
||
"""启用自动文件日志(默认启用)"""
|
||
self._auto_file_enabled = True
|
||
|
||
def disable_auto_file(self) -> None:
|
||
"""禁用自动文件日志"""
|
||
self._auto_file_enabled = False
|
||
|
||
def enable_db_logging(self) -> None:
|
||
"""启用数据库日志(默认启用)"""
|
||
self._db_enabled = True
|
||
|
||
def disable_db_logging(self) -> None:
|
||
"""禁用数据库日志"""
|
||
self._db_enabled = False
|
||
|
||
def set_db_min_level(self, level: int) -> None:
|
||
"""设置数据库日志的最低级别"""
|
||
self._db_min_level = level
|
||
|
||
def set_db_initialized(self, initialized: bool = True) -> None:
|
||
"""标记数据库是否已初始化"""
|
||
self._db_initialized = initialized
|
||
|
||
def set_db_initialized_all(self, initialized: bool = True) -> None:
|
||
"""标记所有日志器数据库是否已初始化"""
|
||
global db_initialized
|
||
db_initialized = initialized
|
||
for logger in logger_instances.values():
|
||
if isinstance(logger, SmartLogger):
|
||
logger.set_db_initialized(initialized)
|
||
|
||
def _get_caller_info(self):
|
||
"""
|
||
获取调用者信息(模块名、函数名、行号)
|
||
在异步任务创建之前调用,确保获取正确的调用栈
|
||
"""
|
||
try:
|
||
import inspect
|
||
|
||
stack = inspect.stack()
|
||
|
||
# 需要跳过的模块/函数名称
|
||
skip_modules = {'asyncio', 'asyncio.events', 'globalobjects.logger', 'logging', 'uvicorn', 'uvicorn.server', 'uvicorn.protocols', 'uvicorn.workers'}
|
||
skip_functions = {'_run', '_log', '_log_to_file', '_log_to_db', '_get_caller_info', 'info', 'debug', 'warning', 'error', 'critical', 'run', 'serve', 'handle'}
|
||
|
||
caller_frame = None
|
||
|
||
for i, frame_info in enumerate(stack[1:]):
|
||
frame = frame_info.frame
|
||
module_name = frame.f_globals.get('__name__', '')
|
||
func_name = frame_info.function
|
||
|
||
class_name = None
|
||
if 'self' in frame.f_locals:
|
||
try:
|
||
class_name = frame.f_locals['self'].__class__.__name__
|
||
except Exception:
|
||
pass
|
||
|
||
# 跳过内部模块和函数
|
||
is_internal = False
|
||
if class_name == 'SmartLogger':
|
||
is_internal = True
|
||
elif module_name in skip_modules:
|
||
is_internal = True
|
||
elif func_name in skip_functions:
|
||
is_internal = True
|
||
elif module_name.startswith('asyncio'):
|
||
is_internal = True
|
||
elif module_name.startswith('logging'):
|
||
is_internal = True
|
||
elif module_name.startswith('uvicorn'):
|
||
is_internal = True
|
||
elif module_name.startswith('starlette'):
|
||
is_internal = True
|
||
elif module_name.startswith('fastapi'):
|
||
is_internal = True
|
||
|
||
if not is_internal:
|
||
caller_frame = frame_info
|
||
break
|
||
|
||
# 如果没有找到合适的调用者,尝试从栈中寻找
|
||
if not caller_frame:
|
||
for i in range(1, min(len(stack), 20)):
|
||
frame_info = stack[i]
|
||
frame = frame_info.frame
|
||
module_name = frame.f_globals.get('__name__', '')
|
||
if not (module_name.startswith('asyncio') or
|
||
module_name.startswith('logging') or
|
||
module_name.startswith('uvicorn') or
|
||
module_name.startswith('starlette') or
|
||
module_name.startswith('fastapi') or
|
||
module_name == 'globalobjects.logger'):
|
||
caller_frame = frame_info
|
||
break
|
||
|
||
if caller_frame:
|
||
return {
|
||
'module': caller_frame.frame.f_globals.get('__name__', ''),
|
||
'function': caller_frame.function,
|
||
'line_number': caller_frame.lineno
|
||
}
|
||
|
||
except Exception:
|
||
pass
|
||
|
||
return None
|
||
|
||
async def _log_to_database(self, level: int, msg: str, caller_info=None, **kwargs) -> None:
|
||
"""
|
||
异步写入数据库
|
||
|
||
Args:
|
||
level: 日志级别
|
||
msg: 日志消息
|
||
**kwargs: 额外的日志参数
|
||
"""
|
||
from core.settings import SQLITE_FILE
|
||
|
||
# 检查数据库是否已初始化
|
||
global db_initialized
|
||
if not db_initialized:
|
||
return
|
||
|
||
# 检查数据库日志是否启用
|
||
if not self._db_enabled or level < self._db_min_level:
|
||
return
|
||
|
||
try:
|
||
import inspect
|
||
import os
|
||
import threading
|
||
|
||
# 尝试导入 SystemLog 模型
|
||
try:
|
||
from apps.common.monitor.models import SystemLog
|
||
except Exception:
|
||
# 导入失败,不记录到文件,避免递归
|
||
return
|
||
|
||
# 获取调用栈
|
||
try:
|
||
stack = inspect.stack()
|
||
except Exception:
|
||
# 获取调用栈失败,不记录到文件,避免递归
|
||
stack = []
|
||
|
||
# 跳过内部方法,找到原始调用位置
|
||
caller_frame = None
|
||
try:
|
||
# 需要跳过的模块/函数名称
|
||
skip_modules = {'asyncio', 'asyncio.events', 'globalobjects.logger', 'logging', 'uvicorn', 'uvicorn.server', 'uvicorn.protocols', 'uvicorn.workers'}
|
||
skip_functions = {'_run', '_log', '_log_to_file', '_log_to_db', 'info', 'debug', 'warning', 'error', 'critical', 'run', 'serve', 'handle'}
|
||
|
||
for i, frame_info in enumerate(stack[1:]):
|
||
frame = frame_info.frame
|
||
module_name = frame.f_globals.get('__name__', '')
|
||
func_name = frame_info.function
|
||
|
||
class_name = None
|
||
if 'self' in frame.f_locals:
|
||
try:
|
||
class_name = frame.f_locals['self'].__class__.__name__
|
||
except Exception:
|
||
pass
|
||
|
||
# 跳过 SmartLogger 类内部调用和其他内部模块
|
||
is_internal = False
|
||
if class_name == 'SmartLogger':
|
||
is_internal = True
|
||
elif module_name in skip_modules:
|
||
is_internal = True
|
||
elif func_name in skip_functions:
|
||
is_internal = True
|
||
elif module_name.startswith('asyncio'):
|
||
is_internal = True
|
||
elif module_name.startswith('logging'):
|
||
is_internal = True
|
||
elif module_name.startswith('uvicorn'):
|
||
is_internal = True
|
||
elif module_name.startswith('starlette'):
|
||
is_internal = True
|
||
elif module_name.startswith('fastapi'):
|
||
is_internal = True
|
||
|
||
if not is_internal:
|
||
caller_frame = frame_info
|
||
break
|
||
|
||
# 如果没有找到合适的调用者,尝试从栈中寻找第一个不是内部模块的帧
|
||
if not caller_frame:
|
||
for i in range(1, min(len(stack), 20)):
|
||
frame_info = stack[i]
|
||
frame = frame_info.frame
|
||
module_name = frame.f_globals.get('__name__', '')
|
||
if not (module_name.startswith('asyncio') or
|
||
module_name.startswith('logging') or
|
||
module_name.startswith('uvicorn') or
|
||
module_name.startswith('starlette') or
|
||
module_name.startswith('fastapi') or
|
||
module_name == 'globalobjects.logger'):
|
||
caller_frame = frame_info
|
||
break
|
||
|
||
# 最后的后备方案
|
||
if not caller_frame and len(stack) > 1:
|
||
caller_frame = stack[-1]
|
||
except Exception:
|
||
# 解析调用栈失败,不记录到文件,避免递归
|
||
pass
|
||
|
||
# 获取堆栈跟踪(仅ERROR及以上级别)
|
||
stack_trace = None
|
||
if level >= logging.ERROR:
|
||
try:
|
||
import traceback
|
||
stack_trace = ''.join(traceback.format_stack())
|
||
except Exception:
|
||
# 获取堆栈跟踪失败,不记录到文件,避免递归
|
||
pass
|
||
|
||
# 创建日志记录
|
||
module_name = ''
|
||
function_name = ''
|
||
line_no = 0
|
||
|
||
# 优先使用传入的 caller_info(在同步上下文中获取的)
|
||
if caller_info:
|
||
module_name = caller_info.get('module', '')
|
||
function_name = caller_info.get('function', '')
|
||
line_no = caller_info.get('line_number', 0)
|
||
elif caller_frame:
|
||
try:
|
||
module_name = caller_frame.frame.f_globals.get('__name__', '')
|
||
function_name = caller_frame.function
|
||
line_no = caller_frame.lineno
|
||
except Exception:
|
||
# 获取调用信息失败,使用默认值
|
||
pass
|
||
|
||
# 尝试创建日志记录
|
||
try:
|
||
# 确保模型有正确的默认连接
|
||
SystemLog._meta.default_connection = SQLITE_FILE
|
||
|
||
await SystemLog.create(
|
||
level=logging.getLevelName(level),
|
||
module=module_name,
|
||
function=function_name,
|
||
line_number=line_no,
|
||
message=msg,
|
||
details=str(kwargs) if kwargs else None,
|
||
stack_trace=stack_trace,
|
||
process_id=os.getpid(),
|
||
thread_id=threading.get_ident(),
|
||
thread_name=threading.current_thread().name
|
||
)
|
||
except Exception:
|
||
# 数据库写入失败,不记录到文件,避免递归
|
||
pass
|
||
except Exception:
|
||
# 其他错误,不记录到文件,避免递归
|
||
pass
|
||
|
||
def _log_to_file(self, level: int, msg: str) -> None:
|
||
"""
|
||
同时写入文件日志
|
||
|
||
Args:
|
||
level: 日志级别
|
||
msg: 日志消息
|
||
"""
|
||
if self._auto_file_enabled and self._file_logger:
|
||
# 只在error级别及以上使用栈追踪
|
||
if level >= logging.ERROR:
|
||
import inspect
|
||
import threading
|
||
import os
|
||
|
||
# 获取调用栈
|
||
stack = inspect.stack()
|
||
|
||
# 跳过内部方法,找到原始调用位置
|
||
caller_frame = None
|
||
# 遍历所有栈帧,找到第一个不是SmartLogger类的方法
|
||
for i, frame_info in enumerate(stack[1:]): # 从栈的第二个元素开始(跳过当前函数)
|
||
frame = frame_info.frame
|
||
# 获取类名
|
||
class_name = None
|
||
if 'self' in frame.f_locals:
|
||
try:
|
||
class_name = frame.f_locals['self'].__class__.__name__
|
||
except Exception:
|
||
pass
|
||
|
||
# 检查是否是SmartLogger类的方法
|
||
if class_name != 'SmartLogger':
|
||
caller_frame = frame_info
|
||
break
|
||
|
||
# 如果没有找到,使用栈的最后一个元素(最外层调用)
|
||
if not caller_frame and len(stack) > 1:
|
||
caller_frame = stack[-1]
|
||
|
||
# 如果找到原始调用位置,使用其信息
|
||
if caller_frame:
|
||
# 创建一个自定义的日志记录,替换函数名和行号
|
||
record = logging.makeLogRecord({
|
||
'levelno': level,
|
||
'levelname': logging.getLevelName(level),
|
||
'msg': msg,
|
||
'args': (),
|
||
'exc_info': None,
|
||
'exc_text': None,
|
||
'stack_info': None,
|
||
'lineno': caller_frame.lineno,
|
||
'funcName': caller_frame.function,
|
||
'filename': caller_frame.filename,
|
||
'module': os.path.basename(caller_frame.filename).split('.')[0],
|
||
'name': self.name,
|
||
'created': time.time(),
|
||
'msecs': (time.time() % 1) * 1000,
|
||
'relativeCreated': 0,
|
||
'thread': threading.get_ident(),
|
||
'threadName': threading.current_thread().name,
|
||
'process': os.getpid(),
|
||
'processName': 'MainProcess'
|
||
})
|
||
self._file_logger.handle(record)
|
||
else:
|
||
# 如果没有找到,使用默认方式
|
||
self._file_logger.log(level, msg)
|
||
else:
|
||
# 普通级别使用默认方式
|
||
self._file_logger.log(level, msg)
|
||
|
||
def _send_to_log_stream(self, level: int, msg: str):
|
||
"""
|
||
将日志发送到日志流处理器
|
||
"""
|
||
try:
|
||
from apps.common.monitor.log_stream_service import _log_stream_manager
|
||
handlers = _log_stream_manager.get_handlers()
|
||
if not handlers:
|
||
return
|
||
|
||
import inspect
|
||
import os
|
||
|
||
# 获取调用信息
|
||
stack = inspect.stack()
|
||
caller_frame = None
|
||
# 查找不是 SmartLogger 类的调用者
|
||
for i, frame_info in enumerate(stack[1:]):
|
||
frame = frame_info.frame
|
||
class_name = None
|
||
if 'self' in frame.f_locals:
|
||
try:
|
||
class_name = frame.f_locals['self'].__class__.__name__
|
||
except Exception:
|
||
pass
|
||
if class_name != 'SmartLogger':
|
||
caller_frame = frame_info
|
||
break
|
||
|
||
if not caller_frame and len(stack) > 1:
|
||
caller_frame = stack[-1]
|
||
|
||
# 浏览器一定支持 emoji!将降级文本恢复为 emoji
|
||
stream_msg = msg
|
||
# 创建一个映射:降级文本 -> emoji
|
||
emoji_replacement = {
|
||
'[OK]': '✅',
|
||
'[FAIL]': '❌',
|
||
'[ERROR]': '🚫',
|
||
'[WARN]': '⚠️',
|
||
'[CRIT]': '💥',
|
||
'[START]': '⏰',
|
||
'[STOP]': '🛑',
|
||
'[INSERT]': '📥',
|
||
'[UPDATE]': '🔄',
|
||
'[DELETE]': '🗑️',
|
||
'[QUERY]': '🔍',
|
||
'[CONNECT]': '🔗',
|
||
'[DISCONNECT]': '🔌',
|
||
'[CACHE]': '💾',
|
||
'[TIMER]': '⏱️',
|
||
'[SYNC]': '🔄',
|
||
'[DEBUG]': '🔍',
|
||
'[INFO]': 'ℹ️'
|
||
}
|
||
# 替换所有降级文本
|
||
for text, emoji in emoji_replacement.items():
|
||
stream_msg = stream_msg.replace(text, emoji)
|
||
|
||
# 创建 LogRecord
|
||
record = logging.LogRecord(
|
||
name=self.name,
|
||
level=level,
|
||
pathname=caller_frame.filename if caller_frame else __file__,
|
||
lineno=caller_frame.lineno if caller_frame else 0,
|
||
msg=stream_msg,
|
||
args=(),
|
||
exc_info=None,
|
||
func=caller_frame.function if caller_frame else ''
|
||
)
|
||
record.module = self.name
|
||
|
||
_log_stream_manager.emit_to_handlers(record)
|
||
except Exception:
|
||
pass
|
||
|
||
def debug(self, msg, *args, **kwargs):
|
||
"""记录 DEBUG 级别的日志"""
|
||
# 检查当前日志器的级别
|
||
if self.isEnabledFor(logging.DEBUG):
|
||
# 获取调用者信息(在创建异步任务之前获取,确保调用栈正确)
|
||
caller_info = self._get_caller_info()
|
||
|
||
# 异步写入数据库
|
||
try:
|
||
import asyncio
|
||
loop = asyncio.get_event_loop()
|
||
if loop.is_running():
|
||
# 格式化消息,处理格式化失败的情况
|
||
try:
|
||
if args:
|
||
formatted_msg = msg % args
|
||
else:
|
||
formatted_msg = msg
|
||
except (TypeError, ValueError):
|
||
# 如果格式化失败,将参数拼接到消息后面
|
||
formatted_msg = f"{msg} {' '.join(map(str, args))}"
|
||
asyncio.create_task(self._log_to_database(logging.DEBUG, formatted_msg, caller_info=caller_info, **kwargs))
|
||
except Exception:
|
||
pass
|
||
|
||
if TERMINAL_SUPPORTS_ANSI:
|
||
try:
|
||
# 获取当前时间
|
||
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
|
||
|
||
# 格式化日志消息
|
||
formatted_msg = msg % args
|
||
|
||
# 使用ANSI颜色代码
|
||
print(f"{ANSI_COLORS['DEBUG']}{timestamp} - DEBUG - {formatted_msg}{ANSI_COLORS['RESET']}")
|
||
# 发送到日志流
|
||
self._send_to_log_stream(logging.DEBUG, formatted_msg)
|
||
except Exception:
|
||
# 如果出错,使用原始方法
|
||
super().debug(msg, *args, **kwargs)
|
||
elif SUPPORT_WINDOWS_API:
|
||
try:
|
||
# 获取当前时间
|
||
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
|
||
|
||
# 格式化日志消息
|
||
formatted_msg = msg % args
|
||
|
||
# 设置控制台颜色
|
||
ctypes.windll.kernel32.SetConsoleTextAttribute(hConsole, LEVEL_COLORS['DEBUG'])
|
||
|
||
# 输出日志消息
|
||
print(f"{timestamp} - DEBUG - {formatted_msg}")
|
||
|
||
# 恢复原始颜色
|
||
ctypes.windll.kernel32.SetConsoleTextAttribute(hConsole, original_color)
|
||
# 发送到日志流
|
||
self._send_to_log_stream(logging.DEBUG, formatted_msg)
|
||
except Exception:
|
||
# 如果出错,使用原始方法
|
||
super().debug(msg, *args, **kwargs)
|
||
else:
|
||
# 如果不支持任何颜色输出,使用原始方法
|
||
super().debug(msg, *args, **kwargs)
|
||
|
||
def info(self, msg, *args, **kwargs):
|
||
"""记录 INFO 级别的日志"""
|
||
# 检查当前日志器的级别
|
||
if self.isEnabledFor(logging.INFO):
|
||
# 获取调用者信息(在创建异步任务之前获取,确保调用栈正确)
|
||
caller_info = self._get_caller_info()
|
||
|
||
# 异步写入数据库
|
||
try:
|
||
import asyncio
|
||
loop = asyncio.get_event_loop()
|
||
if loop.is_running():
|
||
# 格式化消息,处理格式化失败的情况
|
||
try:
|
||
if args:
|
||
formatted_msg = msg % args
|
||
else:
|
||
formatted_msg = msg
|
||
except (TypeError, ValueError):
|
||
# 如果格式化失败,将参数拼接到消息后面
|
||
formatted_msg = f"{msg} {' '.join(map(str, args))}"
|
||
asyncio.create_task(self._log_to_database(logging.INFO, formatted_msg, caller_info=caller_info, **kwargs))
|
||
except Exception:
|
||
pass
|
||
|
||
if TERMINAL_SUPPORTS_ANSI:
|
||
try:
|
||
# 获取当前时间
|
||
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
|
||
|
||
# 格式化日志消息
|
||
try:
|
||
if args:
|
||
formatted_msg = msg % args
|
||
else:
|
||
formatted_msg = msg
|
||
except (TypeError, ValueError):
|
||
# 如果格式化失败,将参数拼接到消息后面
|
||
formatted_msg = f"{msg} {' '.join(map(str, args))}"
|
||
|
||
# 使用ANSI颜色代码
|
||
print(f"{ANSI_COLORS['INFO']}{timestamp} - INFO - {formatted_msg}{ANSI_COLORS['RESET']}")
|
||
# 发送到日志流
|
||
self._send_to_log_stream(logging.INFO, formatted_msg)
|
||
except Exception:
|
||
# 如果出错,使用原始方法
|
||
super().info(msg, *args, **kwargs)
|
||
elif SUPPORT_WINDOWS_API:
|
||
try:
|
||
# 获取当前时间
|
||
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
|
||
|
||
# 格式化日志消息
|
||
try:
|
||
if args:
|
||
formatted_msg = msg % args
|
||
else:
|
||
formatted_msg = msg
|
||
except (TypeError, ValueError):
|
||
# 如果格式化失败,将参数拼接到消息后面
|
||
formatted_msg = f"{msg} {' '.join(map(str, args))}"
|
||
|
||
# 设置控制台颜色
|
||
ctypes.windll.kernel32.SetConsoleTextAttribute(hConsole, LEVEL_COLORS['INFO'])
|
||
|
||
# 输出日志消息
|
||
print(f"{timestamp} - INFO - {formatted_msg}")
|
||
|
||
# 恢复原始颜色
|
||
ctypes.windll.kernel32.SetConsoleTextAttribute(hConsole, original_color)
|
||
# 发送到日志流
|
||
self._send_to_log_stream(logging.INFO, formatted_msg)
|
||
except Exception:
|
||
# 如果出错,使用原始方法
|
||
super().info(msg, *args, **kwargs)
|
||
else:
|
||
# 如果不支持任何颜色输出,使用原始方法
|
||
super().info(msg, *args, **kwargs)
|
||
|
||
def warning(self, msg, *args, **kwargs):
|
||
"""记录 WARNING 级别的日志"""
|
||
# 检查当前日志器的级别
|
||
if self.isEnabledFor(logging.WARNING):
|
||
# 获取调用者信息(在创建异步任务之前获取,确保调用栈正确)
|
||
caller_info = self._get_caller_info()
|
||
|
||
# 异步写入数据库
|
||
try:
|
||
import asyncio
|
||
loop = asyncio.get_event_loop()
|
||
if loop.is_running():
|
||
# 格式化消息,处理格式化失败的情况
|
||
try:
|
||
if args:
|
||
formatted_msg = msg % args
|
||
else:
|
||
formatted_msg = msg
|
||
except (TypeError, ValueError):
|
||
# 如果格式化失败,将参数拼接到消息后面
|
||
formatted_msg = f"{msg} {' '.join(map(str, args))}"
|
||
asyncio.create_task(self._log_to_database(logging.WARNING, formatted_msg, caller_info=caller_info, **kwargs))
|
||
except Exception:
|
||
pass
|
||
|
||
if TERMINAL_SUPPORTS_ANSI:
|
||
try:
|
||
# 获取当前时间
|
||
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
|
||
|
||
# 格式化日志消息
|
||
try:
|
||
if args:
|
||
formatted_msg = msg % args
|
||
else:
|
||
formatted_msg = msg
|
||
except (TypeError, ValueError):
|
||
# 如果格式化失败,将参数拼接到消息后面
|
||
formatted_msg = f"{msg} {' '.join(map(str, args))}"
|
||
|
||
# 使用ANSI颜色代码
|
||
print(f"{ANSI_COLORS['WARNING']}{timestamp} - WARNING - {formatted_msg}{ANSI_COLORS['RESET']}")
|
||
# 发送到日志流
|
||
self._send_to_log_stream(logging.WARNING, formatted_msg)
|
||
except Exception:
|
||
# 如果出错,使用原始方法
|
||
super().warning(msg, *args, **kwargs)
|
||
elif SUPPORT_WINDOWS_API:
|
||
try:
|
||
# 获取当前时间
|
||
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
|
||
|
||
# 格式化日志消息
|
||
try:
|
||
if args:
|
||
formatted_msg = msg % args
|
||
else:
|
||
formatted_msg = msg
|
||
except (TypeError, ValueError):
|
||
# 如果格式化失败,将参数拼接到消息后面
|
||
formatted_msg = f"{msg} {' '.join(map(str, args))}"
|
||
|
||
# 设置控制台颜色
|
||
ctypes.windll.kernel32.SetConsoleTextAttribute(hConsole, LEVEL_COLORS['WARNING'])
|
||
|
||
# 输出日志消息
|
||
print(f"{timestamp} - WARNING - {formatted_msg}")
|
||
|
||
# 恢复原始颜色
|
||
ctypes.windll.kernel32.SetConsoleTextAttribute(hConsole, original_color)
|
||
# 发送到日志流
|
||
self._send_to_log_stream(logging.WARNING, formatted_msg)
|
||
except Exception:
|
||
# 如果出错,使用原始方法
|
||
super().warning(msg, *args, **kwargs)
|
||
else:
|
||
# 如果不支持任何颜色输出,使用原始方法
|
||
super().warning(msg, *args, **kwargs)
|
||
|
||
def error(self, msg, *args, **kwargs):
|
||
"""记录 ERROR 级别的日志"""
|
||
# 检查当前日志器的级别
|
||
if self.isEnabledFor(logging.ERROR):
|
||
# 获取调用者信息(在创建异步任务之前获取,确保调用栈正确)
|
||
caller_info = self._get_caller_info()
|
||
|
||
# 格式化消息,处理格式化失败的情况
|
||
try:
|
||
if args:
|
||
formatted_msg = msg % args
|
||
else:
|
||
formatted_msg = msg
|
||
except (TypeError, ValueError):
|
||
# 如果格式化失败,将参数拼接到消息后面
|
||
formatted_msg = f"{msg} {' '.join(map(str, args))}"
|
||
|
||
# 写入数据库
|
||
try:
|
||
import asyncio
|
||
loop = asyncio.get_event_loop()
|
||
if loop.is_running():
|
||
# 事件循环已运行,创建异步任务
|
||
asyncio.create_task(self._log_to_database(logging.ERROR, formatted_msg, caller_info=caller_info, **kwargs))
|
||
else:
|
||
# 事件循环未运行,同步执行
|
||
# 避免在应用关闭时使用 asyncio.run(),防止事件循环错误
|
||
try:
|
||
loop = asyncio.get_event_loop()
|
||
if not loop.is_closed():
|
||
asyncio.create_task(self._log_to_database(logging.ERROR, formatted_msg, caller_info=caller_info, **kwargs))
|
||
except:
|
||
# 事件循环已关闭,跳过日志记录
|
||
pass
|
||
except Exception:
|
||
pass
|
||
|
||
if TERMINAL_SUPPORTS_ANSI:
|
||
try:
|
||
# 获取当前时间
|
||
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
|
||
|
||
# 格式化日志消息
|
||
try:
|
||
if args:
|
||
formatted_msg = msg % args
|
||
else:
|
||
formatted_msg = msg
|
||
except (TypeError, ValueError):
|
||
# 如果格式化失败,将参数拼接到消息后面
|
||
formatted_msg = f"{msg} {' '.join(map(str, args))}"
|
||
|
||
# 使用ANSI颜色代码
|
||
print(f"{ANSI_COLORS['ERROR']}{timestamp} - ERROR - {formatted_msg}{ANSI_COLORS['RESET']}")
|
||
# 发送到日志流
|
||
self._send_to_log_stream(logging.ERROR, formatted_msg)
|
||
except Exception:
|
||
# 如果出错,使用原始方法
|
||
super().error(msg, *args, **kwargs)
|
||
elif SUPPORT_WINDOWS_API:
|
||
try:
|
||
# 获取当前时间
|
||
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
|
||
|
||
# 格式化日志消息
|
||
try:
|
||
if args:
|
||
formatted_msg = msg % args
|
||
else:
|
||
formatted_msg = msg
|
||
except (TypeError, ValueError):
|
||
# 如果格式化失败,将参数拼接到消息后面
|
||
formatted_msg = f"{msg} {' '.join(map(str, args))}"
|
||
|
||
# 设置控制台颜色
|
||
ctypes.windll.kernel32.SetConsoleTextAttribute(hConsole, LEVEL_COLORS['ERROR'])
|
||
|
||
# 输出日志消息
|
||
print(f"{timestamp} - ERROR - {formatted_msg}")
|
||
|
||
# 恢复原始颜色
|
||
ctypes.windll.kernel32.SetConsoleTextAttribute(hConsole, original_color)
|
||
# 发送到日志流
|
||
self._send_to_log_stream(logging.ERROR, formatted_msg)
|
||
except Exception:
|
||
# 如果出错,使用原始方法
|
||
super().error(msg, *args, **kwargs)
|
||
else:
|
||
# 如果不支持任何颜色输出,使用原始方法
|
||
super().error(msg, *args, **kwargs)
|
||
|
||
def critical(self, msg, *args, **kwargs):
|
||
"""记录 CRITICAL 级别的日志"""
|
||
# 检查当前日志器的级别
|
||
if self.isEnabledFor(logging.CRITICAL):
|
||
# 获取调用者信息(在创建异步任务之前获取,确保调用栈正确)
|
||
caller_info = self._get_caller_info()
|
||
|
||
# 异步写入数据库
|
||
try:
|
||
import asyncio
|
||
loop = asyncio.get_event_loop()
|
||
if loop.is_running():
|
||
# 格式化消息,处理格式化失败的情况
|
||
try:
|
||
if args:
|
||
formatted_msg = msg % args
|
||
else:
|
||
formatted_msg = msg
|
||
except (TypeError, ValueError):
|
||
# 如果格式化失败,将参数拼接到消息后面
|
||
formatted_msg = f"{msg} {' '.join(map(str, args))}"
|
||
asyncio.create_task(self._log_to_database(logging.CRITICAL, formatted_msg, caller_info=caller_info, **kwargs))
|
||
except Exception:
|
||
pass
|
||
|
||
if TERMINAL_SUPPORTS_ANSI:
|
||
try:
|
||
# 获取当前时间
|
||
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
|
||
|
||
# 格式化日志消息
|
||
try:
|
||
if args:
|
||
formatted_msg = msg % args
|
||
else:
|
||
formatted_msg = msg
|
||
except (TypeError, ValueError):
|
||
# 如果格式化失败,将参数拼接到消息后面
|
||
formatted_msg = f"{msg} {' '.join(map(str, args))}"
|
||
|
||
# 使用ANSI颜色代码
|
||
print(f"{ANSI_COLORS['CRITICAL']}{timestamp} - CRITICAL - {formatted_msg}{ANSI_COLORS['RESET']}")
|
||
# 发送到日志流
|
||
self._send_to_log_stream(logging.CRITICAL, formatted_msg)
|
||
except Exception:
|
||
# 如果出错,使用原始方法
|
||
super().critical(msg, *args, **kwargs)
|
||
elif SUPPORT_WINDOWS_API:
|
||
try:
|
||
# 获取当前时间
|
||
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
|
||
|
||
# 格式化日志消息
|
||
try:
|
||
if args:
|
||
formatted_msg = msg % args
|
||
else:
|
||
formatted_msg = msg
|
||
except (TypeError, ValueError):
|
||
# 如果格式化失败,将参数拼接到消息后面
|
||
formatted_msg = f"{msg} {' '.join(map(str, args))}"
|
||
|
||
# 设置控制台颜色
|
||
ctypes.windll.kernel32.SetConsoleTextAttribute(hConsole, LEVEL_COLORS['CRITICAL'])
|
||
|
||
# 输出日志消息
|
||
print(f"{timestamp} - CRITICAL - {formatted_msg}")
|
||
|
||
# 恢复原始颜色
|
||
ctypes.windll.kernel32.SetConsoleTextAttribute(hConsole, original_color)
|
||
# 发送到日志流
|
||
self._send_to_log_stream(logging.CRITICAL, formatted_msg)
|
||
except Exception:
|
||
# 如果出错,使用原始方法
|
||
super().critical(msg, *args, **kwargs)
|
||
else:
|
||
# 如果不支持任何颜色输出,使用原始方法
|
||
super().critical(msg, *args, **kwargs)
|
||
|
||
def success(self, action: str, subject: str = "", details: str = "", to_file: bool = False) -> None:
|
||
"""
|
||
记录成功消息
|
||
|
||
Args:
|
||
action: 操作名称
|
||
subject: 操作主体
|
||
details: 额外详情
|
||
to_file: 是否同时写入文件
|
||
"""
|
||
msg = LogHelper.success(action, subject, details)
|
||
self.info(msg)
|
||
if to_file:
|
||
self._log_to_file(logging.INFO, msg)
|
||
|
||
def fail(self, action: str, subject: str = "", reason: str = "", to_file: bool = True) -> None:
|
||
"""
|
||
记录失败消息(默认同时写入文件)
|
||
|
||
Args:
|
||
action: 操作名称
|
||
subject: 操作主体
|
||
reason: 失败原因
|
||
to_file: 是否同时写入文件,默认True
|
||
"""
|
||
msg = LogHelper.fail(action, subject, reason)
|
||
self.error(msg)
|
||
if to_file:
|
||
self._log_to_file(logging.ERROR, msg)
|
||
|
||
def start(self, action: str, subject: str = "", to_file: bool = False) -> None:
|
||
"""
|
||
记录开始消息
|
||
|
||
Args:
|
||
action: 操作名称
|
||
subject: 操作主体
|
||
to_file: 是否同时写入文件
|
||
"""
|
||
msg = LogHelper.start(action, subject)
|
||
self.info(msg)
|
||
if to_file:
|
||
self._log_to_file(logging.INFO, msg)
|
||
|
||
def stop(self, action: str, subject: str = "", to_file: bool = False) -> None:
|
||
"""
|
||
记录结束消息
|
||
|
||
Args:
|
||
action: 操作名称
|
||
subject: 操作主体
|
||
to_file: 是否同时写入文件
|
||
"""
|
||
msg = LogHelper.stop(action, subject)
|
||
self.info(msg)
|
||
if to_file:
|
||
self._log_to_file(logging.INFO, msg)
|
||
|
||
def status_change(self, subject: str, old_status: str, new_status: str, to_file: bool = False) -> None:
|
||
"""
|
||
记录状态变更消息
|
||
|
||
Args:
|
||
subject: 操作主体
|
||
old_status: 旧状态
|
||
new_status: 新状态
|
||
to_file: 是否同时写入文件
|
||
"""
|
||
msg = LogHelper.status_change(subject, old_status, new_status)
|
||
self.info(msg)
|
||
if to_file:
|
||
self._log_to_file(logging.INFO, msg)
|
||
|
||
def api_response(self, api_name: str, status_code: int, details: str = "", to_file: bool = False) -> None:
|
||
"""
|
||
记录API响应消息
|
||
|
||
Args:
|
||
api_name: API名称
|
||
status_code: HTTP状态码
|
||
details: 额外详情
|
||
to_file: 是否同时写入文件
|
||
"""
|
||
msg = LogHelper.api_response(api_name, status_code, details)
|
||
if 200 <= status_code < 300:
|
||
self.info(msg)
|
||
if to_file:
|
||
self._log_to_file(logging.INFO, msg)
|
||
else:
|
||
self.error(msg)
|
||
self._log_to_file(logging.ERROR, msg)
|
||
|
||
def query(self, target: str, result: str = "", count: int = None, to_file: bool = False) -> None:
|
||
"""
|
||
记录查询消息
|
||
|
||
Args:
|
||
target: 查询目标
|
||
result: 查询结果描述
|
||
count: 结果数量
|
||
to_file: 是否同时写入文件
|
||
"""
|
||
msg = LogHelper.query(target, result, count)
|
||
self.info(msg)
|
||
if to_file:
|
||
self._log_to_file(logging.INFO, msg)
|
||
|
||
def insert(self, target: str, subject: str = "", count: int = None, to_file: bool = False) -> None:
|
||
"""
|
||
记录插入消息
|
||
|
||
Args:
|
||
target: 插入目标
|
||
subject: 插入主体
|
||
count: 插入数量
|
||
to_file: 是否同时写入文件
|
||
"""
|
||
msg = LogHelper.insert(target, subject, count)
|
||
self.info(msg)
|
||
if to_file:
|
||
self._log_to_file(logging.INFO, msg)
|
||
|
||
def update(self, target: str, subject: str = "", count: int = None, to_file: bool = False) -> None:
|
||
"""
|
||
记录更新消息
|
||
|
||
Args:
|
||
target: 更新目标
|
||
subject: 更新主体
|
||
count: 更新数量
|
||
to_file: 是否同时写入文件
|
||
"""
|
||
msg = LogHelper.update(target, subject, count)
|
||
self.info(msg)
|
||
if to_file:
|
||
self._log_to_file(logging.INFO, msg)
|
||
|
||
def delete(self, target: str, subject: str = "", count: int = None, to_file: bool = False) -> None:
|
||
"""
|
||
记录删除消息
|
||
|
||
Args:
|
||
target: 删除目标
|
||
subject: 删除主体
|
||
count: 删除数量
|
||
to_file: 是否同时写入文件
|
||
"""
|
||
msg = LogHelper.delete(target, subject, count)
|
||
self.info(msg)
|
||
if to_file:
|
||
self._log_to_file(logging.INFO, msg)
|
||
|
||
def warning_msg(self, subject: str, message: str, to_file: bool = True) -> None:
|
||
"""
|
||
记录警告消息(默认同时写入文件)
|
||
|
||
Args:
|
||
subject: 警告主体
|
||
message: 警告信息
|
||
to_file: 是否同时写入文件,默认True
|
||
"""
|
||
msg = LogHelper.warning(subject, message)
|
||
self.warning(msg)
|
||
if to_file:
|
||
self._log_to_file(logging.WARNING, msg)
|
||
|
||
def sync(self, action: str, subject: str = "", details: str = "", to_file: bool = False) -> None:
|
||
"""
|
||
记录同步消息
|
||
|
||
Args:
|
||
action: 同步操作名称
|
||
subject: 同步主体
|
||
details: 额外详情
|
||
to_file: 是否同时写入文件
|
||
"""
|
||
msg = LogHelper.sync(action, subject, details)
|
||
self.info(msg)
|
||
if to_file:
|
||
self._log_to_file(logging.INFO, msg)
|
||
|
||
def connect(self, target: str, status: str = "成功", to_file: bool = False) -> None:
|
||
"""
|
||
记录连接消息
|
||
|
||
Args:
|
||
target: 连接目标
|
||
status: 连接状态
|
||
to_file: 是否同时写入文件
|
||
"""
|
||
msg = LogHelper.connect(target, status)
|
||
if status == "成功":
|
||
self.info(msg)
|
||
if to_file:
|
||
self._log_to_file(logging.INFO, msg)
|
||
else:
|
||
self.error(msg)
|
||
self._log_to_file(logging.ERROR, msg)
|
||
|
||
def disconnect(self, target: str, to_file: bool = False) -> None:
|
||
"""
|
||
记录断开连接消息
|
||
|
||
Args:
|
||
target: 断开目标
|
||
to_file: 是否同时写入文件
|
||
"""
|
||
msg = LogHelper.disconnect(target)
|
||
self.info(msg)
|
||
if to_file:
|
||
self._log_to_file(logging.INFO, msg)
|
||
|
||
def cache(self, action: str, target: str = "", details: str = "", to_file: bool = False) -> None:
|
||
"""
|
||
记录缓存消息
|
||
|
||
Args:
|
||
action: 缓存操作
|
||
target: 缓存目标
|
||
details: 额外详情
|
||
to_file: 是否同时写入文件
|
||
"""
|
||
msg = LogHelper.cache(action, target, details)
|
||
self.info(msg)
|
||
if to_file:
|
||
self._log_to_file(logging.INFO, msg)
|
||
|
||
|
||
# 注册智能日志器类
|
||
logging.setLoggerClass(SmartLogger)
|
||
|
||
def setup_logger(name: str, level: str = 'INFO', auto_file: bool = True) -> logging.Logger:
|
||
"""
|
||
设置日志器
|
||
|
||
Args:
|
||
name: 日志器名称
|
||
level: 日志级别
|
||
auto_file: 是否自动关联文件日志器(支持 to_file 参数),默认True
|
||
|
||
Returns:
|
||
配置好的日志器实例
|
||
"""
|
||
logger_key = f"{name}:console"
|
||
if logger_key in logger_instances:
|
||
return logger_instances[logger_key]
|
||
|
||
# 创建 SmartLogger 实例
|
||
logger = SmartLogger(name)
|
||
logger.setLevel(get_log_level(level))
|
||
logger.propagate = False # 防止日志传播
|
||
|
||
# 自动关联文件日志器
|
||
if auto_file:
|
||
file_logger = get_file_logger(name)
|
||
logger.set_file_logger(file_logger)
|
||
|
||
# 存储日志器实例
|
||
logger_instances[logger_key] = logger
|
||
|
||
return logger
|
||
|
||
|
||
class SmartFileHandler(logging.Handler):
|
||
"""智能文件处理器,根据日志级别选择对应文件
|
||
|
||
日志级别与文件写入规则(受 LOG_LEVEL 控制,但 DEBUG/INFO 始终不写磁盘):
|
||
- DEBUG: 仅控制台输出,不写入任何文件
|
||
- INFO: 仅控制台输出,不写入任何文件
|
||
- WARNING: 写入 app.log
|
||
- ERROR: 写入 app.log 和 error.log
|
||
- CRITICAL: 写入 app.log 和 error.log
|
||
"""
|
||
|
||
def __init__(self, name: str):
|
||
super().__init__()
|
||
self.name = name
|
||
# 只创建 error.log 的文件日志器(WARNING 及以上级别)
|
||
self.default_logger = setup_file_logging(name, "app.log") # app.log (WARNING及以上)
|
||
self.default_logger.setLevel(logging.WARNING)
|
||
self.error_logger = setup_file_logging(name, "error.log") # error.log (ERROR及以上)
|
||
self.error_logger.setLevel(logging.ERROR)
|
||
# 不再创建 debug.log,因为 DEBUG 级别不写磁盘
|
||
|
||
def emit(self, record):
|
||
"""根据日志级别选择对应文件
|
||
|
||
重要:DEBUG 和 INFO 级别的日志永远不会写入磁盘文件,
|
||
无论 LOG_LEVEL 环境变量设置为何值。这是为了防止日志体积过大。
|
||
"""
|
||
# DEBUG 级别只输出到控制台,不写入任何文件
|
||
if record.levelno == logging.DEBUG:
|
||
pass # 不写入任何文件
|
||
# INFO 级别只输出到控制台,不写入任何文件
|
||
elif record.levelno == logging.INFO:
|
||
pass # 不写入任何文件
|
||
# WARNING 级别写入 app.log
|
||
elif record.levelno == logging.WARNING:
|
||
self.default_logger.handle(record)
|
||
# ERROR/CRITICAL 级别同时写入 error.log 和 app.log
|
||
elif record.levelno >= logging.ERROR:
|
||
self.error_logger.handle(record)
|
||
self.default_logger.handle(record)
|
||
|
||
|
||
def _create_smart_file_logger(name: str) -> logging.Logger:
|
||
"""
|
||
创建智能文件日志器,根据日志级别自动选择文件
|
||
|
||
Args:
|
||
name: 日志器名称
|
||
|
||
Returns:
|
||
智能文件日志器实例
|
||
"""
|
||
# 创建基础日志器
|
||
logger = logging.getLogger(name)
|
||
logger.setLevel(logging.DEBUG)
|
||
logger.propagate = False
|
||
|
||
# 添加智能文件处理器
|
||
smart_handler = SmartFileHandler(name)
|
||
logger.addHandler(smart_handler)
|
||
|
||
return logger
|
||
|
||
|
||
def get_file_logger(name: str) -> logging.Logger:
|
||
"""
|
||
获取智能文件日志器,根据级别自动选择文件
|
||
|
||
Args:
|
||
name: 日志器名称
|
||
|
||
Returns:
|
||
配置好的智能文件日志器实例
|
||
"""
|
||
# 创建智能文件日志器
|
||
key = f"{name}:smart"
|
||
if key in logger_instances:
|
||
return logger_instances[key]
|
||
|
||
logger = _create_smart_file_logger(name)
|
||
logger_instances[key] = logger
|
||
return logger
|
||
|
||
|
||
def get_logger(name: Optional[str] = None, level: str = 'INFO') -> logging.Logger:
|
||
"""
|
||
获取统一的日志器
|
||
|
||
Args:
|
||
name: 日志器名称,默认使用调用模块的名称
|
||
level: 日志级别,默认 'INFO',可设置为 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'
|
||
|
||
Returns:
|
||
配置好的日志器实例
|
||
|
||
Note:
|
||
返回的日志器支持 to_file 参数,可控制是否同时写入文件:
|
||
- console_log.fail(...) # 默认 to_file=True,自动写入文件
|
||
- console_log.success(...) # 默认 to_file=False,仅控制台
|
||
- console_log.success(..., to_file=True) # 强制写入文件
|
||
"""
|
||
# 如果没有提供名称,自动获取调用模块的名称
|
||
if name is None:
|
||
import inspect
|
||
frame = inspect.currentframe()
|
||
try:
|
||
# 获取调用者的模块名
|
||
if frame and frame.f_back:
|
||
name = frame.f_back.f_globals.get('__name__', 'unknown')
|
||
else:
|
||
name = 'unknown'
|
||
finally:
|
||
if frame:
|
||
del frame
|
||
|
||
# 获取基础日志器
|
||
logger = setup_logger(name, level=level)
|
||
|
||
return logger
|
||
|
||
|
||
# ============================================================================
|
||
# 日志引擎切换(V1: logging / V2: loguru)
|
||
# ============================================================================
|
||
|
||
_use_loguru = None
|
||
|
||
def _check_use_loguru() -> bool:
|
||
"""
|
||
检查是否使用 loguru
|
||
|
||
逻辑:
|
||
1. 如果 USE_LOGURU=false,强制使用 V1
|
||
2. 如果 USE_LOGURU=true 或未设置,尝试使用 V2
|
||
3. 如果 loguru 未安装,自动回退到 V1
|
||
"""
|
||
from core.settings import USE_LOGURU
|
||
|
||
global _use_loguru
|
||
if _use_loguru is None:
|
||
try:
|
||
import os
|
||
use_loguru_env = USE_LOGURU
|
||
|
||
# 如果明确设置为 false,使用 V1
|
||
if use_loguru_env == "false":
|
||
_use_loguru = False
|
||
return _use_loguru
|
||
|
||
# 默认使用 V2,但需要检查 loguru 是否安装
|
||
try:
|
||
import loguru
|
||
_use_loguru = True
|
||
except ImportError:
|
||
# loguru 未安装,回退到 V1
|
||
import warnings
|
||
warnings.warn("loguru 未安装,回退到原生 logging")
|
||
_use_loguru = False
|
||
except Exception:
|
||
_use_loguru = False
|
||
return _use_loguru
|
||
|
||
|
||
def get_logger_unified(name: Optional[str] = None, level: str = 'INFO'):
|
||
"""
|
||
获取统一的日志器(自动选择 V1 或 V2)
|
||
|
||
根据环境变量 USE_LOGURU 和 loguru 安装状态决定使用哪个日志引擎:
|
||
- USE_LOGURU=false: 强制使用原生 logging (V1)
|
||
- USE_LOGURU=true 或未设置: 优先使用 loguru (V2),未安装则回退到 V1
|
||
|
||
Args:
|
||
name: 日志器名称
|
||
level: 日志级别
|
||
|
||
Returns:
|
||
SmartLogger (V1) 或 SmartLoggerV2 (V2)
|
||
"""
|
||
if _check_use_loguru():
|
||
try:
|
||
from globalobjects.logger_v2 import get_logger_v2
|
||
return get_logger_v2(name or "app", level)
|
||
except Exception:
|
||
pass
|
||
|
||
return get_logger(name, level)
|
||
|
||
|
||
def initialize_logging_unified() -> None:
|
||
"""
|
||
初始化日志系统(自动选择 V1 或 V2)
|
||
"""
|
||
if _check_use_loguru():
|
||
try:
|
||
from globalobjects.logger_v2 import initialize_logging_v2
|
||
initialize_logging_v2()
|
||
return
|
||
except Exception:
|
||
pass
|
||
|
||
initialize_logging()
|
||
|
||
|
||
def shutdown_logging_unified() -> None:
|
||
"""
|
||
关闭日志系统(自动选择 V1 或 V2)
|
||
"""
|
||
if _check_use_loguru():
|
||
try:
|
||
from globalobjects.logger_v2 import shutdown_logging_v2
|
||
shutdown_logging_v2()
|
||
return
|
||
except Exception:
|
||
pass
|
||
|
||
shutdown_logging()
|
||
|
||
|
||
def set_db_initialized_unified(initialized: bool = True) -> None:
|
||
"""
|
||
设置数据库初始化状态(V1 和 V2 通用)
|
||
"""
|
||
global db_initialized
|
||
db_initialized = initialized
|
||
|
||
# 同时设置 V2
|
||
if _check_use_loguru():
|
||
try:
|
||
from globalobjects.logger_v2 import set_db_initialized
|
||
set_db_initialized(initialized)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def initialize_logging() -> None:
|
||
"""
|
||
初始化日志系统
|
||
"""
|
||
# 配置根日志器
|
||
root_logger = logging.getLogger()
|
||
root_logger.setLevel(logging.INFO)
|
||
|
||
# 移除默认处理器
|
||
for handler in root_logger.handlers[:]:
|
||
root_logger.removeHandler(handler)
|
||
|
||
# 配置第三方库的日志级别,减少噪音
|
||
logging.getLogger('httpx').setLevel(logging.WARNING)
|
||
logging.getLogger('httpcore').setLevel(logging.WARNING)
|
||
logging.getLogger('uvicorn').setLevel(logging.INFO)
|
||
logging.getLogger('uvicorn.access').setLevel(logging.ERROR)
|
||
logging.getLogger('pymysqlreplication').setLevel(logging.WARNING)
|
||
# 启动文件日志监听器
|
||
start_all_listeners()
|
||
|
||
# 记录初始化信息
|
||
logger = get_logger(__name__)
|
||
logger.info("✅ 日志系统初始化完成")
|
||
|
||
|
||
def shutdown_logging() -> None:
|
||
"""
|
||
关闭日志系统
|
||
"""
|
||
# 关闭所有文件日志
|
||
close_logging()
|
||
|
||
# 清理日志器实例
|
||
logger_instances.clear()
|
||
|
||
# 记录关闭信息
|
||
logger = get_logger(__name__)
|
||
logger.info("✅ 日志系统已关闭")
|
||
|
||
|
||
# 便捷函数
|
||
|
||
def _send_to_log_stream(level: str, msg: Any, *args: Any):
|
||
"""
|
||
将日志发送到所有注册的日志流处理器
|
||
"""
|
||
try:
|
||
from apps.common.monitor.log_stream_service import _log_stream_manager
|
||
handlers = _log_stream_manager.get_handlers()
|
||
if not handlers:
|
||
return
|
||
|
||
formatted_msg = msg % args if args else str(msg)
|
||
|
||
import inspect
|
||
module = 'unknown'
|
||
func_name = 'unknown'
|
||
line_no = 0
|
||
|
||
try:
|
||
stack = inspect.stack()
|
||
|
||
# 需要跳过的模块/函数名称
|
||
skip_modules = {'asyncio', 'asyncio.events', 'globalobjects.logger', 'logging', 'uvicorn', 'uvicorn.server', 'uvicorn.protocols', 'uvicorn.workers'}
|
||
skip_functions = {'_run', '_log', '_log_to_file', '_log_to_db', 'info', 'debug', 'warning', 'error', 'critical', 'run', 'serve', 'handle', '_send_to_log_stream'}
|
||
|
||
caller_frame = None
|
||
|
||
for i, frame_info in enumerate(stack[1:]):
|
||
frame = frame_info.frame
|
||
module_name = frame.f_globals.get('__name__', '')
|
||
function = frame_info.function
|
||
|
||
class_name = None
|
||
if 'self' in frame.f_locals:
|
||
try:
|
||
class_name = frame.f_locals['self'].__class__.__name__
|
||
except Exception:
|
||
pass
|
||
|
||
# 跳过内部模块和函数
|
||
is_internal = False
|
||
if class_name == 'SmartLogger':
|
||
is_internal = True
|
||
elif module_name in skip_modules:
|
||
is_internal = True
|
||
elif function in skip_functions:
|
||
is_internal = True
|
||
elif module_name.startswith('asyncio'):
|
||
is_internal = True
|
||
elif module_name.startswith('logging'):
|
||
is_internal = True
|
||
elif module_name.startswith('uvicorn'):
|
||
is_internal = True
|
||
elif module_name.startswith('starlette'):
|
||
is_internal = True
|
||
elif module_name.startswith('fastapi'):
|
||
is_internal = True
|
||
|
||
if not is_internal:
|
||
caller_frame = frame_info
|
||
break
|
||
|
||
# 如果没有找到合适的调用者,尝试从栈中寻找
|
||
if not caller_frame:
|
||
for i in range(1, min(len(stack), 20)):
|
||
frame_info = stack[i]
|
||
frame = frame_info.frame
|
||
module_name = frame.f_globals.get('__name__', '')
|
||
if not (module_name.startswith('asyncio') or
|
||
module_name.startswith('logging') or
|
||
module_name.startswith('uvicorn') or
|
||
module_name.startswith('starlette') or
|
||
module_name.startswith('fastapi') or
|
||
module_name == 'globalobjects.logger'):
|
||
caller_frame = frame_info
|
||
break
|
||
|
||
if caller_frame:
|
||
module = caller_frame.frame.f_globals.get('__name__', 'unknown')
|
||
func_name = caller_frame.function
|
||
line_no = caller_frame.lineno
|
||
|
||
except Exception:
|
||
pass
|
||
|
||
# 清理栈帧引用
|
||
if 'frame' in dir():
|
||
del frame
|
||
if 'stack' in dir():
|
||
del stack
|
||
|
||
record = logging.LogRecord(
|
||
name=module,
|
||
level=getattr(logging, level, logging.INFO),
|
||
pathname='',
|
||
lineno=line_no,
|
||
msg=formatted_msg,
|
||
args=(),
|
||
exc_info=None,
|
||
func=func_name
|
||
)
|
||
record.module = module
|
||
|
||
_log_stream_manager.emit_to_handlers(record)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def debug(msg: Any, *args: Any, **kwargs: Any) -> None:
|
||
"""
|
||
记录 DEBUG 级别的日志
|
||
"""
|
||
# 检查日志级别,只有当日志级别允许时才输出
|
||
logger = get_logger()
|
||
if not logger.isEnabledFor(logging.DEBUG):
|
||
return
|
||
|
||
# 发送到日志流
|
||
_send_to_log_stream('DEBUG', msg, *args)
|
||
|
||
if TERMINAL_SUPPORTS_ANSI:
|
||
try:
|
||
# 获取当前时间
|
||
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
|
||
|
||
# 格式化日志消息
|
||
formatted_msg = msg % args
|
||
|
||
# 使用ANSI颜色代码
|
||
print(f"{ANSI_COLORS['DEBUG']}{timestamp} - DEBUG - {formatted_msg}{ANSI_COLORS['RESET']}")
|
||
except Exception:
|
||
# 如果出错,使用标准输出
|
||
get_logger().debug(msg, *args, **kwargs)
|
||
elif SUPPORT_WINDOWS_API:
|
||
try:
|
||
# 获取当前时间
|
||
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
|
||
|
||
# 格式化日志消息
|
||
formatted_msg = msg % args
|
||
|
||
# 设置控制台颜色
|
||
ctypes.windll.kernel32.SetConsoleTextAttribute(hConsole, LEVEL_COLORS['DEBUG'])
|
||
|
||
# 输出日志消息
|
||
print(f"{timestamp} - DEBUG - {formatted_msg}")
|
||
|
||
# 恢复原始颜色
|
||
ctypes.windll.kernel32.SetConsoleTextAttribute(hConsole, original_color)
|
||
except Exception:
|
||
# 如果出错,使用标准输出
|
||
get_logger().debug(msg, *args, **kwargs)
|
||
else:
|
||
# 如果不支持任何颜色输出,使用标准输出
|
||
get_logger().debug(msg, *args, **kwargs)
|
||
|
||
|
||
def info(msg: Any, *args: Any, **kwargs: Any) -> None:
|
||
"""
|
||
记录 INFO 级别的日志
|
||
"""
|
||
# 发送到日志流
|
||
_send_to_log_stream('INFO', msg, *args)
|
||
|
||
if TERMINAL_SUPPORTS_ANSI:
|
||
try:
|
||
# 获取当前时间
|
||
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
|
||
|
||
# 格式化日志消息
|
||
formatted_msg = msg % args
|
||
|
||
# 使用ANSI颜色代码
|
||
print(f"{ANSI_COLORS['INFO']}{timestamp} - INFO - {formatted_msg}{ANSI_COLORS['RESET']}")
|
||
except Exception:
|
||
# 如果出错,使用标准输出
|
||
get_logger().info(msg, *args, **kwargs)
|
||
elif SUPPORT_WINDOWS_API:
|
||
try:
|
||
# 获取当前时间
|
||
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
|
||
|
||
# 格式化日志消息
|
||
formatted_msg = msg % args
|
||
|
||
# 设置控制台颜色
|
||
ctypes.windll.kernel32.SetConsoleTextAttribute(hConsole, LEVEL_COLORS['INFO'])
|
||
|
||
# 输出日志消息
|
||
print(f"{timestamp} - INFO - {formatted_msg}")
|
||
|
||
# 恢复原始颜色
|
||
ctypes.windll.kernel32.SetConsoleTextAttribute(hConsole, original_color)
|
||
except Exception:
|
||
# 如果出错,使用标准输出
|
||
get_logger().info(msg, *args, **kwargs)
|
||
else:
|
||
# 如果不支持任何颜色输出,使用标准输出
|
||
get_logger().info(msg, *args, **kwargs)
|
||
|
||
|
||
def warning(msg: Any, *args: Any, **kwargs: Any) -> None:
|
||
"""
|
||
记录 WARNING 级别的日志
|
||
"""
|
||
# 发送到日志流
|
||
_send_to_log_stream('WARNING', msg, *args)
|
||
|
||
if TERMINAL_SUPPORTS_ANSI:
|
||
try:
|
||
# 获取当前时间
|
||
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
|
||
|
||
# 格式化日志消息
|
||
formatted_msg = msg % args
|
||
|
||
# 使用ANSI颜色代码
|
||
print(f"{ANSI_COLORS['WARNING']}{timestamp} - WARNING - {formatted_msg}{ANSI_COLORS['RESET']}")
|
||
except Exception:
|
||
# 如果出错,使用标准输出
|
||
get_logger().warning(msg, *args, **kwargs)
|
||
elif SUPPORT_WINDOWS_API:
|
||
try:
|
||
# 获取当前时间
|
||
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
|
||
|
||
# 格式化日志消息
|
||
formatted_msg = msg % args
|
||
|
||
# 设置控制台颜色
|
||
ctypes.windll.kernel32.SetConsoleTextAttribute(hConsole, LEVEL_COLORS['WARNING'])
|
||
|
||
# 输出日志消息
|
||
print(f"{timestamp} - WARNING - {formatted_msg}")
|
||
|
||
# 恢复原始颜色
|
||
ctypes.windll.kernel32.SetConsoleTextAttribute(hConsole, original_color)
|
||
except Exception:
|
||
# 如果出错,使用标准输出
|
||
get_logger().warning(msg, *args, **kwargs)
|
||
else:
|
||
# 如果不支持任何颜色输出,使用标准输出
|
||
get_logger().warning(msg, *args, **kwargs)
|
||
|
||
|
||
def error(msg: Any, *args: Any, **kwargs: Any) -> None:
|
||
"""
|
||
记录 ERROR 级别的日志
|
||
"""
|
||
# 发送到日志流
|
||
_send_to_log_stream('ERROR', msg, *args)
|
||
|
||
if TERMINAL_SUPPORTS_ANSI:
|
||
try:
|
||
# 获取当前时间
|
||
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
|
||
|
||
# 格式化日志消息
|
||
formatted_msg = msg % args
|
||
|
||
# 使用ANSI颜色代码
|
||
print(f"{ANSI_COLORS['ERROR']}{timestamp} - ERROR - {formatted_msg}{ANSI_COLORS['RESET']}")
|
||
except Exception:
|
||
# 如果出错,使用标准输出
|
||
get_logger().error(msg, *args, **kwargs)
|
||
elif SUPPORT_WINDOWS_API:
|
||
try:
|
||
# 获取当前时间
|
||
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
|
||
|
||
# 格式化日志消息
|
||
formatted_msg = msg % args
|
||
|
||
# 设置控制台颜色
|
||
ctypes.windll.kernel32.SetConsoleTextAttribute(hConsole, LEVEL_COLORS['ERROR'])
|
||
|
||
# 输出日志消息
|
||
print(f"{timestamp} - ERROR - {formatted_msg}")
|
||
|
||
# 恢复原始颜色
|
||
ctypes.windll.kernel32.SetConsoleTextAttribute(hConsole, original_color)
|
||
except Exception:
|
||
# 如果出错,使用标准输出
|
||
get_logger().error(msg, *args, **kwargs)
|
||
else:
|
||
# 如果不支持任何颜色输出,使用标准输出
|
||
get_logger().error(msg, *args, **kwargs)
|
||
|
||
|
||
def critical(msg: Any, *args: Any, **kwargs: Any) -> None:
|
||
"""
|
||
记录 CRITICAL 级别的日志
|
||
"""
|
||
# 发送到日志流
|
||
_send_to_log_stream('CRITICAL', msg, *args)
|
||
if TERMINAL_SUPPORTS_ANSI:
|
||
try:
|
||
# 获取当前时间
|
||
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
|
||
|
||
# 格式化日志消息
|
||
formatted_msg = msg % args
|
||
|
||
# 使用ANSI颜色代码
|
||
print(f"{ANSI_COLORS['CRITICAL']}{timestamp} - CRITICAL - {formatted_msg}{ANSI_COLORS['RESET']}")
|
||
except Exception:
|
||
# 如果出错,使用标准输出
|
||
get_logger().critical(msg, *args, **kwargs)
|
||
elif SUPPORT_WINDOWS_API:
|
||
try:
|
||
# 获取当前时间
|
||
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
|
||
|
||
# 格式化日志消息
|
||
formatted_msg = msg % args
|
||
|
||
# 设置控制台颜色
|
||
ctypes.windll.kernel32.SetConsoleTextAttribute(hConsole, LEVEL_COLORS['CRITICAL'])
|
||
|
||
# 输出日志消息
|
||
print(f"{timestamp} - CRITICAL - {formatted_msg}")
|
||
|
||
# 恢复原始颜色
|
||
ctypes.windll.kernel32.SetConsoleTextAttribute(hConsole, original_color)
|
||
except Exception:
|
||
# 如果出错,使用标准输出
|
||
get_logger().critical(msg, *args, **kwargs)
|
||
else:
|
||
# 如果不支持任何颜色输出,使用标准输出
|
||
get_logger().critical(msg, *args, **kwargs)
|
||
|
||
|
||
def exception(msg: Any, *args: Any, **kwargs: Any) -> None:
|
||
"""
|
||
记录异常信息
|
||
"""
|
||
get_logger().exception(msg, *args, **kwargs)
|
||
|
||
|
||
# 导出的便捷日志器
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
|
||
if __name__ == "__main__":
|
||
"""
|
||
日志模块使用范例
|
||
"""
|
||
print("=== 日志模块使用范例 ===")
|
||
|
||
# 1. 初始化日志系统
|
||
print("\n1. 初始化日志系统:")
|
||
initialize_logging()
|
||
|
||
# 2. 基本使用 - 获取日志器
|
||
print("\n2. 基本使用 - 获取日志器:")
|
||
# 自动识别模块名
|
||
logger1 = get_logger()
|
||
print(f" ✅ 获取默认日志器: {logger1.name}")
|
||
|
||
# 手动指定模块名
|
||
logger2 = get_logger("my_module")
|
||
print(f" ✅ 获取指定模块日志器: {logger2.name}")
|
||
|
||
# 3. 测试不同级别的日志
|
||
print("\n3. 测试不同级别的日志:")
|
||
logger = get_logger("test_logger")
|
||
logger.debug("这是一条 DEBUG 级别的日志")
|
||
logger.info("这是一条 INFO 级别的日志")
|
||
logger.warning("这是一条 WARNING 级别的日志")
|
||
logger.error("这是一条 ERROR 级别的日志")
|
||
logger.critical("这是一条 CRITICAL 级别的日志")
|
||
|
||
# 4. 测试异常日志
|
||
print("\n4. 测试异常日志:")
|
||
try:
|
||
1 / 0
|
||
except Exception as e:
|
||
logger.exception("发生了一个异常")
|
||
|
||
# 5. 测试智能文件日志器
|
||
print("\n5. 测试智能文件日志器:")
|
||
file_logger = get_file_logger("file_test")
|
||
file_logger.info("这是一条写入文件的 INFO 日志")
|
||
file_logger.error("这是一条写入文件的 ERROR 日志")
|
||
|
||
# 5.1 测试智能文件日志器
|
||
print("\n5.1 测试智能文件日志器:")
|
||
smart_logger = get_file_logger("smart_test")
|
||
print(" ✅ 创建智能文件日志器")
|
||
# 启动文件日志监听器(确保新创建的监听器被启动)
|
||
start_all_listeners()
|
||
print(" ✅ 启动文件日志监听器")
|
||
# 测试不同级别的日志
|
||
smart_logger.debug("这是一条智能 DEBUG 级别的日志")
|
||
smart_logger.info("这是一条智能 INFO 级别的日志")
|
||
smart_logger.warning("这是一条智能 WARNING 级别的日志")
|
||
smart_logger.error("这是一条智能 ERROR 级别的日志")
|
||
smart_logger.critical("这是一条智能 CRITICAL 级别的日志")
|
||
# 等待日志写入
|
||
import time
|
||
time.sleep(0.5)
|
||
print(" ✅ 智能文件日志器测试完成")
|
||
|
||
# 6. 使用便捷函数
|
||
print("\n6. 使用便捷函数:")
|
||
debug("使用便捷函数记录 DEBUG 日志")
|
||
info("使用便捷函数记录 INFO 日志")
|
||
warning("使用便捷函数记录 WARNING 日志")
|
||
error("使用便捷函数记录 ERROR 日志")
|
||
critical("使用便捷函数记录 CRITICAL 日志")
|
||
|
||
# 7. 测试应用生命周期管理
|
||
print("\n7. 测试应用生命周期管理:")
|
||
print(" 模拟应用运行中...")
|
||
|
||
# 8. 关闭日志系统
|
||
print("\n8. 关闭日志系统:")
|
||
shutdown_logging()
|
||
|
||
print("\n=== 使用范例结束 ===")
|
||
print("\n完整使用流程:")
|
||
print("1. 导入: from globalobjects import logger as log_config")
|
||
print("2. 初始化: log_config.initialize_logging() (应用启动时)")
|
||
print("3. 获取日志器: logger = log_config.get_logger(__name__)")
|
||
print("4. 记录日志: logger.info('日志内容')")
|
||
print("5. 关闭日志: log_config.shutdown_logging() (应用关闭时)")
|
||
print("\n文件日志使用:")
|
||
print("file_logger = log_config.get_file_logger(__name__, 'default')")
|
||
print("file_logger.info('文件日志内容')")
|
||
print("\n智能文件日志器使用:")
|
||
print("smart_logger = log_config.get_file_logger(__name__, smart=True)")
|
||
print("smart_logger.info('智能文件日志内容')") |