0%

记一次开发过程中涉及性能问题分析的乌龙事件,顺便谈谈python的装饰器

在开发QQ机器人项目的过程中,我经历了一次颇具戏剧性的性能问题排查。这次乌龙事件不仅让我对Python的装饰器机制有了更深入的理解,也让我明白了细节对于程序性能的重要性。在此,我想分享整个排查过程,并深入探讨Python的装饰器。

问题的起因

为了提高代码的复用性和可维护性,我在项目中广泛使用了Python的装饰器,来实现鉴权、计算函数运行时间、捕获异常、限速、超时重试、屏蔽词过滤等功能。

某天,当我完成这些装饰器的更新后,重新运行机器人,意外地发现:原本仅需3到5秒就能回复消息的机器人,竟需要等待十几二十秒才能响应。这种明显的性能下降引起了我的注意。

初步排查

面对这个问题,我首先怀疑是程序内部逻辑出了问题。为了找到可能的异常,我在与消息处理相关的各个位置添加了日志。然而,日志显示程序运行正常,没有任何异常信息。

接下来,我怀疑是否是系统资源瓶颈导致的。于是,我检查了电脑的内存占用和硬盘I/O,结果它们都在正常范围内。而且,项目使用的是以高性能著称的MongoDB数据库,理论上应该不会成为瓶颈。

为了彻底排除数据库的问题,我尝试在消息数据和数据库之间增加一层缓存,直接使用内存来保存消息队列。然而,这并没有改善速度,机器人回复仍然需要十几秒。

灵光一闪

一系列的排查未能找到问题所在,让我一度陷入迷茫。突然,我灵光一闪,想到可以设计一个新的装饰器,用来计算函数的执行时间,从而精确定位问题。

于是,我编写了一个计算函数执行时间的装饰器,并将其应用在所有涉及网络通信、I/O操作、内存读写、进程间通信和线程切换的函数上,逐一排查性能瓶颈。

1
2
3
4
5
6
7
8
9
10
11
12
def async_timed():
"""计算异步函数执行时间的装饰器"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start = asyncio.get_event_loop().time()
result = await func(*args, **kwargs)
end = asyncio.get_event_loop().time()
logger.debug(f"函数 {func.__name__} 执行时间: {end - start:.2f} 秒")
return result
return wrapper
return decorator

通过这个装饰器,我发现网络通信部分正常,耗时在3到5秒之间;其他与计算机相关的操作耗时也很短。真正的问题出现在消息的处理和发送部分。

问题的根源

原来,为了控制机器人发消息的速度,防止被服务器认为是刷屏,我在处理和发送消息的函数上添加了一个限速装饰器。最初的限速装饰器实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
def rate_limit(calls: int, period: float):
"""限速装饰器"""
semaphore = asyncio.Semaphore(calls)

def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
async with semaphore:
result = await func(*args, **kwargs)
await asyncio.sleep(period / calls)
return result
return wrapper
return decorator

这个实现存在一个严重问题:在每次函数执行后,都强制

await asyncio.sleep(period / calls)

,这会导致函数阻塞,从而严重影响了机器人的响应速度。

解决方案

意识到问题所在后,我决定重新设计限速装饰器,采用更为高效的令牌桶算法(Token Bucket Algorithm),以避免阻塞。

新的限速装饰器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import time
import asyncio
from functools import wraps

def rate_limit(calls: int, period: float):
"""限速装饰器,使用令牌桶算法控制调用频率"""
max_tokens = calls # 令牌桶的容量
tokens = calls # 当前令牌数
refill_time = period / calls # 每个令牌的填充时间
last_check = time.time() # 上次检查的时间
lock = asyncio.Lock() # 异步锁,确保线程安全

def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
nonlocal tokens, last_check
async with lock:
current = time.time()
# 计算自上次检查后应添加的令牌数
elapsed = current - last_check
refill = elapsed / refill_time
if refill > 0:
tokens = min(tokens + refill, max_tokens)
last_check = current
if tokens >= 1:
tokens -= 1
else:
# 计算需要等待的时间
wait_time = refill_time - elapsed % refill_time
await asyncio.sleep(wait_time)
return await wrapper(*args, **kwargs)
return await func(*args, **kwargs)
return wrapper
return decorator

算法详解

  1. 变量初始化

    • max_tokens:令牌桶的最大容量。
    • tokens:当前令牌数,初始为满值。
    • refill_time:每个令牌的重新填充时间。
    • last_check:上次检查令牌的时间。
    • lock:异步锁,确保多协程环境下的线程安全。
  2. 令牌更新逻辑

    • 计算时间差 elapsed,根据时间差计算新增加的令牌数 refill
    • 更新当前令牌数 tokens,并确保不超过最大容量。
    • 更新 last_check 时间。
  3. 处理请求

    • 如果有足够的令牌(tokens >= 1),则扣除一个令牌,直接执行函数。
    • 如果令牌不足,则计算需要等待的时间 wait_time,异步等待后递归调用 wrapper

应用新的装饰器

将新的限速装饰器应用到消息发送函数上:

1
2
3
4
@rate_limit(calls=10, period=60)  # 限速,每分钟最多10次调用
async def send_msg(msg_type, number, msg, use_voice=False, is_error_message=False):
# 消息发送逻辑
# ...

这样,在不阻塞其他协程的情况下,成功地限制了消息发送的频率,机器人回复速度恢复正常。

进一步的装饰器应用

在项目中,我还使用了其他装饰器,如过滤敏感词、处理异常等。

过滤消息装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def filter_message(func):
"""过滤消息的装饰器"""
@wraps(func)
async def wrapper(message, *args, **kwargs):
if isinstance(message, dict):
content = message.get('text', '')
# 如果是系统消息,跳过检查
if message.get('role') == 'system':
return await func(message, *args, **kwargs)
else:
content = str(message)

blocked_word = word_filter.contains_blocked_word(content) # 你可以先把屏蔽词放进一个json或者yaml文件里,然后写一个读取的函数
if blocked_word:
logger.warning(f"检测到敏感词'{blocked_word}',消息已被过滤。")
return None # 或者返回特定的提示消息

return await func(message, *args, **kwargs)
return wrapper

该装饰器在消息处理函数执行前,检测消息内容是否包含敏感词,若包含则过滤掉,确保消息的合规性。

异常处理装饰器

1
2
3
4
5
6
7
8
9
10
11
def error_handler(func):
"""捕获并处理异常的装饰器"""
@wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
logger.error(f"执行函数 {func.__name__} 时发生异常:{e}", exc_info=True)
# 根据需要返回特定的错误提示
return "抱歉,处理您的请求时出现错误。"
return wrapper

这个装饰器捕获异步函数执行过程中可能出现的异常,防止程序崩溃,并提供统一的错误处理机制。

鉴权装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def admin_only(func):
"""管理员权限检查装饰器"""

@wraps(func)
async def wrapper(msg_type, user_info, *args, **kwargs):
user_id = user_info['user_id']
logger.info(f"Checking admin status for user: {user_id}")

if str(user_id) != str(config.ADMIN_ID):
logger.warning(f"Non-admin user {user_id} attempted to use admin command")
return "对不起,您没有执行此命令的权限。"
logger.info(f"Admin command executed by user: {user_id}")
return await func(msg_type, user_info, *args, **kwargs)
return wrapper

超时重试装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def retry(max_retries: int = 3, delay: float = 1.0):
"""重试装饰器"""

def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
logger.error(f"函数 {func.__name__}{max_retries} 次尝试后仍然失败: {str(e)}")
raise
await asyncio.sleep(delay * (2 ** attempt)) # 指数退避
return wrapper
return decorator

深入理解Python的装饰器

通过这次实践,我对Python的装饰器有了更深入的理解。

装饰器的本质

装饰器本质上是一个返回函数的函数,它可以在不修改原函数代码的情况下,增加新的功能。装饰器通过对原函数进行包装,控制其输入和输出。

使用@wraps保持元数据

在编写装饰器时,使用 @functools.wraps(func) 可以保留原函数的元数据,如函数名、注释等。这对于调试和文档生成非常有用。

装饰器的顺序

当一个函数被多个装饰器装饰时,装饰器的应用顺序是自下而上,即最内层的装饰器先被应用。例如:

1
2
3
4
@decorator_a
@decorator_b
def func():
pass

等同于

func = decorator_a(decorator_b(func))

装饰器在异步函数中的应用

在异步编程中,装饰器需要兼容 async/await 语法。例如,装饰器内部的函数需要使用 async def 定义,并在适当的位置使用 await

总结

这次乌龙事件让我深刻体会到细节对程序性能的影响。通过重新设计限速装饰器,采用更合理的算法,不仅解决了性能问题,还加深了我对Python装饰器的理解。

在开发过程中,使用装饰器可以提高代码的可读性和可维护性,但也需要谨慎,确保装饰器的实现不会引入新的问题。

最后,希望我的分享能对大家有所帮助。如果你也有类似的经验或想法,欢迎在评论区与我交流!