Files
2026-05-13 17:16:49 +08:00

976 lines
34 KiB
Python

"""
事件聚合管理器 - 用于批量处理同类事件
主要功能:
1. 收集同类事件到缓冲区
2. 定时批量处理事件
3. 支持去重
4. 支持分组处理
5. 支持独立线程池隔离
线程池策略:
- 每个事件类型拥有独立的线程池,避免争抢
- 线程池大小根据事件类型静态配置
- 支持线程池监控
"""
import os
import threading
import time
from collections import defaultdict, deque
from typing import Callable, Any, List, Dict, Set, Optional
from concurrent.futures import ThreadPoolExecutor
from threading import Lock
from globalobjects import logger as log_config
import os
LOG_LEVEL = os.getenv("LOG_LEVEL") or "INFO"
logger = log_config.get_logger(__name__, level=LOG_LEVEL)
import multiprocessing
CPU_COUNT = multiprocessing.cpu_count() or 4
class EventThreadPoolManager:
"""事件类型独立线程池管理器"""
_instance = None
_lock = Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if hasattr(self, '_initialized') and self._initialized:
return
self._pools: Dict[str, ThreadPoolExecutor] = {}
self._pool_configs: Dict[str, Dict] = {}
self._pool_locks: Dict[str, Lock] = {}
self._global_lock = Lock()
# 动态调整相关
self._pool_stats: Dict[str, Dict] = {} # 线程池运行时统计
self._adjust_thread: Optional[threading.Thread] = None
self._adjust_running = False
self._adjust_interval = 10 # 调整检查间隔(秒)
self._scale_up_threshold = 50 # 队列长度超过此值时扩容
self._scale_down_threshold = 5 # 队列长度低于此值时缩容
self._cooldown_time = 30 # 调整冷却时间(秒)
self._initialized = True
self._register_default_pools()
self._start_adjust_thread()
def _register_default_pools(self):
"""注册默认线程池配置"""
default_configs = {
'pl_status_a2e': {
'min_workers': 3,
'max_workers': 10,
'description': 'PL状态变更A2E事件'
},
'pr_created': {
'min_workers': 2,
'max_workers': 6,
'description': 'PR创建事件'
},
'pl_to_mo': {
'min_workers': 2,
'max_workers': 6,
'description': 'PL变更为MO事件'
},
'pr_deleted': {
'min_workers': 1,
'max_workers': 4,
'description': 'PR删除事件'
},
'default': {
'min_workers': 2,
'max_workers': 5,
'description': '默认事件'
}
}
for name, config in default_configs.items():
self.register_pool(name, **config)
def register_pool(self, name: str, min_workers: int = 2, max_workers: int = 5, description: str = ''):
"""注册一个事件类型的线程池"""
with self._global_lock:
if name in self._pools:
logger.warning(f"线程池 {name} 已存在,将重新创建")
self._pools[name].shutdown(wait=False)
del self._pools[name]
self._pool_configs[name] = {
'min_workers': min_workers,
'max_workers': max_workers,
'description': description
}
self._pool_locks[name] = Lock()
self._pools[name] = ThreadPoolExecutor(
max_workers=min_workers,
thread_name_prefix=f'event-{name}-'
)
logger.success(f"事件线程池注册", name, f"min={min_workers}, max={max_workers}")
def get_pool(self, name: str) -> Optional[ThreadPoolExecutor]:
"""获取事件类型的线程池,如果不存在则自动注册"""
with self._global_lock:
if name not in self._pools:
logger.info(f"线程池 {name} 不存在,自动注册独立线程池")
self._pool_configs[name] = {
'min_workers': 2,
'max_workers': 5,
'description': f'自动注册的事件线程池: {name}'
}
self._pool_locks[name] = Lock()
self._pools[name] = ThreadPoolExecutor(
max_workers=2,
thread_name_prefix=f'event-{name}-'
)
logger.success(f"事件线程池自动注册", name, "min=2, max=5")
return self._pools.get(name)
def get_pool_stats(self, name: str) -> Dict:
"""获取线程池统计信息"""
with self._global_lock:
if name not in self._pool_configs:
return {}
config = self._pool_configs[name]
pool = self._pools.get(name)
return {
'name': name,
'min_workers': config['min_workers'],
'max_workers': config['max_workers'],
'description': config['description'],
'pool_active': pool is not None and not pool._shutdown,
}
def get_all_stats(self) -> Dict[str, Dict]:
"""获取所有线程池的统计信息"""
return {name: self.get_pool_stats(name) for name in self._pool_configs.keys()}
def shutdown_all(self):
"""关闭所有线程池"""
self._stop_adjust_thread()
with self._global_lock:
for name, pool in self._pools.items():
pool.shutdown(wait=True)
logger.info(f"线程池已关闭", name)
self._pools.clear()
def _start_adjust_thread(self):
"""启动动态调整监控线程"""
if self._adjust_thread is not None and self._adjust_thread.is_alive():
return
self._adjust_running = True
self._adjust_thread = threading.Thread(
target=self._adjust_loop,
daemon=True,
name='event-pool-adjuster'
)
self._adjust_thread.start()
logger.info("✅ 事件线程池动态调整线程已启动")
def _stop_adjust_thread(self):
"""停止动态调整监控线程"""
self._adjust_running = False
if self._adjust_thread and self._adjust_thread.is_alive():
self._adjust_thread.join(timeout=5)
logger.info("🛑 事件线程池动态调整线程已停止")
def _adjust_loop(self):
"""动态调整循环"""
while self._adjust_running:
try:
self._check_and_adjust_pools()
except Exception as e:
logger.error(f"线程池动态调整检查失败: {e}")
time.sleep(self._adjust_interval)
def _check_and_adjust_pools(self):
"""检查并调整所有线程池"""
with self._global_lock:
for name in list(self._pools.keys()):
try:
self._adjust_pool_size(name)
except Exception as e:
logger.warning(f"调整线程池 {name} 失败: {e}")
def _adjust_pool_size(self, name: str):
"""
调整单个线程池的大小
扩容条件:队列长度 > scale_up_threshold
缩容条件:队列长度 < scale_down_threshold 且持续一段时间
"""
if name not in self._pools or name not in self._pool_configs:
return
pool = self._pools[name]
config = self._pool_configs[name]
if pool._shutdown:
return
# 获取队列长度
queue_size = self._get_queue_size(pool)
current_workers = getattr(pool, '_max_workers', config['min_workers'])
min_workers = config['min_workers']
max_workers = config['max_workers']
# 初始化统计
if name not in self._pool_stats:
self._pool_stats[name] = {
'last_adjust_time': 0,
'low_queue_count': 0,
'current_workers': current_workers
}
stats = self._pool_stats[name]
now = time.time()
# 检查冷却时间
if now - stats['last_adjust_time'] < self._cooldown_time:
return
target_workers = current_workers
# 扩容逻辑
if queue_size > self._scale_up_threshold and current_workers < max_workers:
target_workers = min(current_workers + 1, max_workers)
logger.info(f"📈 线程池 {name} 队列堆积({queue_size}),扩容: {current_workers}{target_workers}")
# 缩容逻辑(需要连续多次检测队列都较低)
elif queue_size < self._scale_down_threshold:
stats['low_queue_count'] = (stats.get('low_queue_count') or 0) + 1
if stats['low_queue_count'] >= 3 and current_workers > min_workers:
target_workers = max(current_workers - 1, min_workers)
logger.info(f"📉 线程池 {name} 队列空闲({queue_size}),缩容: {current_workers}{target_workers}")
stats['low_queue_count'] = 0
else:
stats['low_queue_count'] = 0
# 执行调整
if target_workers != current_workers:
self._resize_pool(name, target_workers)
stats['last_adjust_time'] = now
stats['current_workers'] = target_workers
def _get_queue_size(self, pool: ThreadPoolExecutor) -> int:
"""获取线程池工作队列大小"""
try:
if hasattr(pool, '_work_queue'):
return pool._work_queue.qsize()
return 0
except Exception:
return 0
def _resize_pool(self, name: str, new_size: int):
"""
调整线程池大小
注意:Python ThreadPoolExecutor 不支持动态调整 max_workers
这里采用创建新线程池的方式实现
"""
if name not in self._pools:
return
old_pool = self._pools[name]
config = self._pool_configs[name]
# 创建新线程池
new_pool = ThreadPoolExecutor(
max_workers=new_size,
thread_name_prefix=f'event-{name}-'
)
# 替换线程池(旧线程池会在现有任务完成后自动关闭)
self._pools[name] = new_pool
# 标记旧线程池为待关闭
try:
old_pool.shutdown(wait=False)
except Exception:
pass
logger.success(f"线程池大小调整", name, f"workers={new_size}")
def get_pool_detailed_stats(self, name: str) -> Dict:
"""获取线程池详细统计信息"""
with self._global_lock:
if name not in self._pool_configs:
return {}
config = self._pool_configs[name]
pool = self._pools.get(name)
stats = self._pool_stats.get(name, {})
queue_size = 0
active_count = 0
if pool and not pool._shutdown:
queue_size = self._get_queue_size(pool)
if hasattr(pool, '_threads'):
active_count = len([t for t in pool._threads if t.is_alive()])
return {
'name': name,
'min_workers': config['min_workers'],
'max_workers': config['max_workers'],
'current_workers': stats.get('current_workers', config['min_workers']),
'active_threads': active_count,
'queue_size': queue_size,
'description': config['description'],
'pool_active': pool is not None and not pool._shutdown,
'last_adjust_time': stats.get('last_adjust_time', 0),
}
def get_all_detailed_stats(self) -> Dict[str, Dict]:
"""获取所有线程池的详细统计信息"""
return {name: self.get_pool_detailed_stats(name) for name in self._pool_configs.keys()}
def set_adjust_config(self,
adjust_interval: int = None,
scale_up_threshold: int = None,
scale_down_threshold: int = None,
cooldown_time: int = None):
"""
配置动态调整参数
Args:
adjust_interval: 调整检查间隔(秒)
scale_up_threshold: 扩容阈值(队列长度)
scale_down_threshold: 缩容阈值(队列长度)
cooldown_time: 调整冷却时间(秒)
"""
if adjust_interval is not None:
self._adjust_interval = adjust_interval
if scale_up_threshold is not None:
self._scale_up_threshold = scale_up_threshold
if scale_down_threshold is not None:
self._scale_down_threshold = scale_down_threshold
if cooldown_time is not None:
self._cooldown_time = cooldown_time
logger.info(f"动态调整参数已更新: interval={self._adjust_interval}s, "
f"scale_up={self._scale_up_threshold}, scale_down={self._scale_down_threshold}, "
f"cooldown={self._cooldown_time}s")
_event_pool_manager: Optional[EventThreadPoolManager] = None
def get_event_pool_manager() -> EventThreadPoolManager:
"""获取事件线程池管理器单例"""
global _event_pool_manager
if _event_pool_manager is None:
_event_pool_manager = EventThreadPoolManager()
return _event_pool_manager
class EventAggregator:
"""事件聚合管理器"""
def __init__(self,
handler: Callable[[List[Any]], None],
group_key: Callable[[Any], str] = None,
dedup_key: Callable[[Any], str] = None,
batch_size: int = 10000,
quiet_window: float = 10.0,
flush_interval: float = 5.0,
name: str = "unnamed"):
"""
初始化事件聚合器
Args:
handler: 批量处理函数,接收事件列表
group_key: 分组函数,返回分组键,用于将事件分组处理
dedup_key: 去重函数,返回去重键,相同键的事件会被去重
batch_size: 批量处理的最大事件数
quiet_window: 安静窗口(秒),连续无新事件达到此时间后触发批量处理
flush_interval: 定时刷新间隔(秒),仅在缓冲区为空时生效
name: 聚合器名称,用于日志和调试
"""
self.handler = handler
self.group_key = group_key
self.dedup_key = dedup_key
self.batch_size = batch_size
self.quiet_window = quiet_window
self.flush_interval = flush_interval
self.name = name
self._last_event_time = 0.0
self._last_flush_time = 0
# 缓冲区:{group_key: {dedup_key: event}}
self._buffer: Dict[str, Dict[str, Any]] = defaultdict(dict)
self._lock = threading.RLock()
self._condition = threading.Condition(self._lock)
self._running = False
self._condition_thread = None
# 统计数据
self.stats = {
'total_received': 0,
'total_processed': 0,
'total_failed': 0,
'processing_latencies': deque(maxlen=1000),
'received_timestamps': deque(maxlen=3600),
'processed_timestamps': deque(maxlen=3600),
'first_received_time': None,
'last_activity_time': None,
}
def add(self, event: Any):
"""添加单个事件到缓冲区"""
with self._lock:
now = time.time()
# 更新统计数据
self.stats['total_received'] += 1
self.stats['received_timestamps'].append(now)
if self.stats['first_received_time'] is None:
self.stats['first_received_time'] = now
self.stats['last_activity_time'] = now
# 重置安静窗口计时器
self._last_event_time = now
# 计算分组键
g_key = self.group_key(event) if self.group_key else "__default__"
# 计算去重键
if self.dedup_key:
d_key = self.dedup_key(event)
self._buffer[g_key][d_key] = event
else:
# 无去重时,使用索引作为键
idx = len(self._buffer[g_key])
self._buffer[g_key][str(idx)] = event
# 检查是否达到批量大小
total_count = sum(len(events) for events in self._buffer.values())
if total_count >= self.batch_size:
# 达到批量大小,立即刷新
self._flush()
self._last_flush_time = now
self._last_event_time = now
def add_batch(self, events: List[Any]):
"""批量添加事件"""
for event in events:
self.add(event)
def _flush(self):
"""刷新缓冲区,处理所有事件"""
with self._lock:
if not self._buffer:
return
# 复制缓冲区数据
buffer_copy = dict(self._buffer)
self._buffer.clear()
# 提交到事件专属线程池处理,实现批次间并行且隔离
pool_manager = get_event_pool_manager()
event_pool = pool_manager.get_pool(self.name)
if event_pool:
event_pool.submit(self._process_batch, buffer_copy)
else:
logger.warning(f"未找到事件线程池 {self.name},使用默认线程池")
default_pool = pool_manager.get_pool('default')
if default_pool:
default_pool.submit(self._process_batch, buffer_copy)
def _process_batch(self, buffer_copy):
"""处理单个批次的事件"""
start_time = time.time()
total_events = sum(len(events_dict.values()) for events_dict in buffer_copy.values())
try:
for g_key, events_dict in buffer_copy.items():
events = list(events_dict.values())
if events:
logger.debug(f"处理分组{g_key}{len(events)}个事件")
self.handler(events)
# 更新成功统计
end_time = time.time()
with self._lock:
self.stats['total_processed'] += total_events
self.stats['processed_timestamps'].append(end_time)
self.stats['processing_latencies'].append((end_time - start_time) * 1000)
self.stats['last_activity_time'] = end_time
except Exception as e:
# 更新失败统计
end_time = time.time()
with self._lock:
self.stats['total_failed'] += total_events
self.stats['last_activity_time'] = end_time
logger.fail("批量处理事件", "", str(e))
def get_stats(self):
"""获取统计数据
Returns:
dict: 包含统计信息的字典
"""
with self._lock:
stats = self.stats.copy()
now = time.time()
# 计算待处理数
pending_count = sum(len(events) for events in self._buffer.values())
# 计算时间窗口统计
one_minute_ago = now - 60
one_hour_ago = now - 3600
today_start = now - (now % 86400)
events_last_minute = sum(1 for ts in stats['received_timestamps'] if ts >= one_minute_ago)
events_last_hour = sum(1 for ts in stats['received_timestamps'] if ts >= one_hour_ago)
events_today = sum(1 for ts in stats['received_timestamps'] if ts >= today_start)
# 计算成功率
total = stats['total_processed'] + stats['total_failed']
success_rate = (stats['total_processed'] / total * 100) if total > 0 else 100.0
# 计算平均延迟
avg_latency = 0.0
if stats['processing_latencies']:
avg_latency = sum(stats['processing_latencies']) / len(stats['processing_latencies'])
return {
'total_received': stats['total_received'],
'total_processed': stats['total_processed'],
'total_failed': stats['total_failed'],
'pending_count': pending_count,
'success_rate': success_rate,
'avg_processing_latency': avg_latency,
'last_activity_time': stats['last_activity_time'],
'first_received_time': stats['first_received_time'],
'batch_size': self.batch_size,
'flush_interval': self.flush_interval,
'current_buffer_size': pending_count,
'events_last_minute': events_last_minute,
'events_last_hour': events_last_hour,
'events_today': events_today,
}
def reset_stats(self):
"""重置统计数据"""
with self._lock:
self.stats = {
'total_received': 0,
'total_processed': 0,
'total_failed': 0,
'processing_latencies': deque(maxlen=1000),
'received_timestamps': deque(maxlen=3600),
'processed_timestamps': deque(maxlen=3600),
'first_received_time': None,
'last_activity_time': None,
}
def _condition_thread_func(self):
"""条件变量线程函数 - 惰性刷新:连续 quiet_window 无新事件时触发批量处理"""
while self._running:
with self._lock:
total_count = sum(len(events) for events in self._buffer.values())
if total_count == 0:
# 缓冲区为空,等待 flush_interval
self._condition.wait(timeout=self.flush_interval)
else:
# 缓冲区有数据,等待安静窗口
wait_time = self.quiet_window
self._condition.wait(timeout=wait_time)
# 检查是否需要刷新:缓冲区不为空且连续 quiet_window 无新事件
if self._buffer:
elapsed_since_last_event = time.time() - self._last_event_time
if elapsed_since_last_event >= self.quiet_window:
self._last_flush_time = time.time()
self._flush()
def start(self):
"""启动聚合器"""
if not self._running:
self._running = True
# 启动条件变量线程
self._condition_thread = threading.Thread(
target=self._condition_thread_func,
name=f"event-aggregator-{self.name}"
)
self._condition_thread.daemon = True
self._condition_thread.start()
logger.start(f"事件聚合器: {self.name}")
def stop(self):
"""停止聚合器"""
if self._running:
self._running = False
# 通知条件变量线程结束
with self._lock:
self._condition.notify()
if self._condition_thread:
logger.debug(f"等待事件聚合器线程结束: {self.name}")
self._condition_thread.join(timeout=5.0)
if self._condition_thread.is_alive():
logger.warning(f"事件聚合器线程未能正常结束: {self.name}")
self._condition_thread = None
# 停止前刷新剩余事件
self._flush()
logger.stop(f"事件聚合器: {self.name}")
def flush_now(self):
"""立即刷新缓冲区"""
self._flush()
class MultiEventAggregator:
"""多类型事件聚合管理器,管理多个不同类型的聚合器"""
def __init__(self):
self._aggregators: Dict[str, EventAggregator] = {}
self._event_descriptions: Dict[str, str] = {}
self._lock = threading.RLock()
def register(self,
event_type: str,
handler: Callable[[List[Any]], None],
group_key: Callable[[Any], str] = None,
dedup_key: Callable[[Any], str] = None,
batch_size: int = 10000,
quiet_window: float = 15.0,
flush_interval: float = 5.0,
description: str = None) -> 'MultiEventAggregator':
"""
注册一个事件类型的聚合器
Args:
event_type: 事件类型标识
handler: 批量处理函数
group_key: 分组函数
dedup_key: 去重函数
batch_size: 批量大小
quiet_window: 安静窗口(秒),连续无新事件达到此时间后触发批量处理
flush_interval: 刷新间隔(秒),仅在缓冲区为空时生效
description: 事件类型描述
Returns:
self,支持链式调用
"""
with self._lock:
# 检查事件类型是否已经注册
if event_type in self._aggregators:
logger.warning(f"事件类型 {event_type} 已经注册,将停止并重新注册")
self._aggregators[event_type].stop()
del self._aggregators[event_type]
aggregator = EventAggregator(
handler=handler,
group_key=group_key,
dedup_key=dedup_key,
batch_size=batch_size,
quiet_window=quiet_window,
flush_interval=flush_interval,
name=event_type
)
self._aggregators[event_type] = aggregator
self._event_descriptions[event_type] = description or event_type
aggregator.start()
logger.success(f"事件聚合器注册", event_type, "")
return self
def add(self, event_type: str, event: Any):
"""添加事件到指定类型的聚合器"""
description = self._event_descriptions.get(event_type, event_type)
with self._lock:
if event_type in self._aggregators:
logger.start(f"添加事件到聚合器,刷新间隔{self._aggregators[event_type].flush_interval}", event_type)
logger.debug(f"{event}")
self._aggregators[event_type].add(event)
def add_batch(self, event_type: str, events: List[Any]):
"""批量添加事件到指定类型的聚合器"""
with self._lock:
if event_type in self._aggregators:
logger.start(f"批量添加事件到聚合器,刷新间隔{self._aggregators[event_type].flush_interval}", event_type)
logger.debug(f"{events}")
self._aggregators[event_type].add_batch(events)
def stop(self, event_type: str = None):
"""停止聚合器
Args:
event_type: 指定事件类型,None表示停止所有
"""
with self._lock:
if event_type:
if event_type in self._aggregators:
self._aggregators[event_type].stop()
del self._aggregators[event_type]
if event_type in self._event_descriptions:
del self._event_descriptions[event_type]
else:
for aggregator in self._aggregators.values():
aggregator.stop()
self._aggregators.clear()
self._event_descriptions.clear()
def flush_now(self, event_type: str = None):
"""立即刷新
Args:
event_type: 指定事件类型,None表示刷新所有
"""
with self._lock:
if event_type:
if event_type in self._aggregators:
logger.start(f"立即刷新聚合器{event_type},刷新间隔{self._aggregators[event_type].flush_interval}")
self._aggregators[event_type].flush_now()
else:
for aggregator in self._aggregators.values():
aggregator.flush_now()
def get_all_stats(self):
"""获取所有事件类型的统计数据
Returns:
dict: 包含各事件类型统计数据的字典
"""
with self._lock:
stats = {}
for event_type, aggregator in self._aggregators.items():
stats[event_type] = aggregator.get_stats()
return stats
def get_event_types(self):
"""获取已注册的事件类型列表
Returns:
list: 事件类型列表
"""
with self._lock:
return list(self._aggregators.keys())
def get_event_description(self, event_type: str) -> str:
"""获取事件类型的描述
Args:
event_type: 事件类型标识
Returns:
str: 事件类型描述,如果不存在则返回事件类型本身
"""
with self._lock:
return self._event_descriptions.get(event_type, event_type)
def get_all_event_descriptions(self) -> Dict[str, str]:
"""获取所有事件类型的描述
Returns:
dict: 事件类型到描述的映射
"""
with self._lock:
return dict(self._event_descriptions)
def reset_stats(self, event_type: str = None):
"""重置统计数据
Args:
event_type: 指定事件类型,None表示重置所有
"""
with self._lock:
if event_type:
if event_type in self._aggregators:
self._aggregators[event_type].reset_stats()
else:
for aggregator in self._aggregators.values():
aggregator.reset_stats()
# 全局多事件聚合管理器实例
_global_handler_aggregator = MultiEventAggregator()
def get_global_handler_aggregator() -> MultiEventAggregator:
"""获取全局处理事件聚合管理器"""
return _global_handler_aggregator
# 使用示例
if __name__ == "__main__":
import time
# 示例1: 基本使用
def basic_handler(events):
print(f"基本处理: 收到 {len(events)} 个事件")
for event in events:
print(f" - 事件: {event}")
# 创建聚合器
aggregator1 = EventAggregator(
handler=basic_handler,
batch_size=5,
flush_interval=2.0
)
# 启动聚合器
aggregator1.start()
# 添加事件
print("示例1: 基本使用")
for i in range(7):
aggregator1.add(f"事件{i}")
time.sleep(0.5)
# 等待一段时间让聚合器处理事件
time.sleep(3)
# 停止聚合器
aggregator1.stop()
print("\n" + "-" * 50 + "\n")
# 示例2: 使用去重功能
def dedup_handler(events):
print(f"去重处理: 收到 {len(events)} 个事件")
for event in events:
print(f" - 事件: {event}")
# 去重函数: 使用事件内容作为去重键
def dedup_key_func(event):
return event
aggregator2 = EventAggregator(
handler=dedup_handler,
dedup_key=dedup_key_func,
batch_size=3,
flush_interval=1.0
)
aggregator2.start()
print("示例2: 使用去重功能")
# 添加重复事件
aggregator2.add("重复事件")
aggregator2.add("唯一事件1")
aggregator2.add("重复事件") # 这个会被去重
aggregator2.add("唯一事件2")
aggregator2.add("重复事件") # 这个会被去重
time.sleep(2)
aggregator2.stop()
print("\n" + "-" * 50 + "\n")
# 示例3: 使用分组功能
def group_handler(events):
print(f"分组处理: 收到 {len(events)} 个事件")
for event in events:
print(f" - 事件: {event}")
# 分组函数: 根据事件类型分组
def group_key_func(event):
return event["type"]
aggregator3 = EventAggregator(
handler=group_handler,
group_key=group_key_func,
batch_size=4,
flush_interval=1.5
)
aggregator3.start()
print("示例3: 使用分组功能")
# 添加不同类型的事件
aggregator3.add({"type": "user", "data": "用户1"})
aggregator3.add({"type": "order", "data": "订单1"})
aggregator3.add({"type": "user", "data": "用户2"})
aggregator3.add({"type": "order", "data": "订单2"})
aggregator3.add({"type": "user", "data": "用户3"})
time.sleep(2)
aggregator3.stop()
print("\n" + "-" * 50 + "\n")
# 示例4: 使用MultiEventAggregator
def user_handler(events):
print(f"用户事件处理: 收到 {len(events)} 个事件")
def order_handler(events):
print(f"订单事件处理: 收到 {len(events)} 个事件")
multi_aggregator = MultiEventAggregator()
# 注册不同类型的事件处理器
multi_aggregator.register(
event_type="user",
handler=user_handler,
batch_size=3,
flush_interval=1.0
).register(
event_type="order",
handler=order_handler,
batch_size=2,
flush_interval=1.5
)
print("示例4: 使用MultiEventAggregator")
# 添加不同类型的事件
multi_aggregator.add("user", "用户事件1")
multi_aggregator.add("order", "订单事件1")
multi_aggregator.add("user", "用户事件2")
multi_aggregator.add("order", "订单事件2")
multi_aggregator.add("user", "用户事件3")
time.sleep(2)
multi_aggregator.stop()
print("\n" + "-" * 50 + "\n")
# 示例5: 使用全局聚合器
def global_handler(events):
print(f"全局处理: 收到 {len(events)} 个事件")
global_aggregator = get_global_handler_aggregator()
# 注册事件类型
global_aggregator.register(
event_type="global_event",
handler=global_handler,
batch_size=4,
flush_interval=2.0
)
print("示例5: 使用全局聚合器")
# 添加事件
for i in range(6):
global_aggregator.add("global_event", f"全局事件{i}")
time.sleep(0.3)
time.sleep(3)
global_aggregator.stop("global_event")
print("\n所有示例执行完成!")