在日常的服务器维护工作中,经常会遇到系统响应变慢、任务堆积、服务之间调用混乱的问题。比如某个电商平台在大促期间,订单一下涌进来,订单服务还没处理完,库存服务已经被压垮了。这时候光靠加机器解决不了根本问题,得从服务端架构上想办法。
为什么需要消息队列
想象一下邮局寄信的过程:你把信投进邮箱,邮局负责后续的分拣和投递,你不需要盯着每一封信怎么走。消息队列就是系统的“邮箱”。服务之间不直接调用,而是把任务扔进队列,由消费者慢慢处理。这样生产者不用等结果,系统解耦了,压力也分散了。
常见的场景比如用户注册后发邮件、生成日志异步写入、订单创建后触发积分更新。这些操作都不需要立刻完成,完全可以丢到消息队列里排队处理。
常见的消息队列选型
Kafka 适合高吞吐的日志类场景,RabbitMQ 在企业级应用中更灵活,RocketMQ 常用于金融级业务,而 Redis 的 list 结构也能实现简单的队列功能。选择哪个要看实际需求。比如小项目用 Redis 就够了,大系统就得上 Kafka 或 RocketMQ。
代码示例:用 RabbitMQ 发送消息
下面是一个简单的 Python 示例,展示如何用 pika 库发送一条消息到 RabbitMQ:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = "Hello, this is a task!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
))
print(f"[x] Sent {message}")
connection.close()
消费端如何处理任务
消费者一直监听队列,有消息就处理。处理完再手动确认,避免任务丢失。如果某次处理失败,消息还能重新入队或进入死信队列排查问题。
def callback(ch, method, properties, body):
print(f"[x] Received {body.decode()}")
# 模拟处理耗时
import time
time.sleep(1)
print("[x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
print('[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
运维中的注意事项
消息队列不是万能药。用不好反而会带来新问题。比如队列积压太多,可能是消费者太慢或者挂了;消息重复消费,业务要支持幂等;网络抖动可能导致连接断开,得加上重连机制。
定期查看队列长度、消费速率、失败日志,是服务器维护的常规动作。Kafka 的 lag 监控、RabbitMQ 的管理界面都能帮你快速定位问题。
另外,别忘了设置合理的超时和重试策略。有些任务重试五次还不行,就应该进人工处理队列,而不是一直卡着。