Vanson's Eternal Blog

MQ夯实基础

Mq basic.png
Published on
/21 mins read/---

MQ

Basic

死信队列

每业务队列独享路由 key + 死信队列

步骤说明代码/参数
① 业务队列绑定业务交换机,并设置 DLX & routing-keyx-dead-letter-exchange + x-dead-letter-routing-key
② 死信交换机可复用同一个(Direct 即可)exchange_declare('dlx.exchange', 'direct')
③ 死信队列每个业务队列独享一个,绑定 DLX 且路由 key 唯一queue_bind(queue='dlq.order.pay-ttl', exchange='dlx.exchange', routing_key='rk.order.pay-ttl')
import pika
 
conn = pika.BlockingConnection()
ch   = conn.channel()
 
# 1. 死信交换机(复用)
ch.exchange_declare('dlx.exchange', exchange_type='direct', durable=True)
 
# 2. 死信队列(每个业务独享)
dlq_name = 'dlq.order.pay-ttl'
ch.queue_declare(dlq_name, durable=True)
ch.queue_bind(dlq_name, 'dlx.exchange', routing_key='rk.order.pay-ttl')
 
# 3. 业务队列(带 DLX 配置)
biz_q = 'q.order.pay-ttl'
args = {
    'x-message-ttl': 30 * 60 * 1000,          # 30 分钟 TTL
    'x-dead-letter-exchange': 'dlx.exchange',
    'x-dead-letter-routing-key': 'rk.order.pay-ttl'
}
ch.queue_declare(biz_q, durable=True, arguments=args)
 
# 4. 死信消费者(监听死信队列)
def on_dlq(ch, method, props, body):
    order_id = body.decode()
    # 查询微信 → 关闭或补偿
    ch.basic_ack(method.delivery_tag)
 
ch.basic_consume(dlq_name, on_dlq, auto_ack=False)
ch.start_consuming()

场景:

  • 超时订单:TTL 30 min → DLQ → 查询微信 → 关闭或补偿。
  • 重试失败:basicNack(requeue=False) 直接进入对应 DLQ,避免业务队列阻塞。

消息可靠性

xiaoxikekaoxing-2025-07-21-11-04-54

消息丢失

产生原因

  • 网络故障:消息可能在传输过程中因网络问题而丢失。
  • RabbitMQ故障:如果RabbitMQ宕机,消息也可能丢失。

解决

开启事务机制

事务在RabbitMQ中可能会影响性能,因为它们需要在所有节点上同步状态。因此,RabbitMQ尽量避免使用事务。

private static void executeTransaction(Channel channel) throws IOException {
        boolean transactionSuccess = false;
        try {
            // 开启事务
            channel.txSelect(); 
 
            // 执行一系列消息操作,例如:channel.basicPublish(exchange, routingKey, message);
            // 提交事务
            channel.txCommit(); 
            transactionSuccess = true;
        } catch (ShutdownSignalException | IOException e) {
            // 回滚事务
            if (!transactionSuccess) {
                channel.txRollback(); 
            }
            throw e;
        }
    }

生产者确认机制

发布者确认机制允许发布者知道消息 是否已经被RabbitMQ成功接收:

public static void sendPersistentMessage(String host, String queueName, String message) {
        try (Connection connection = new ConnectionFactory().setHost(host).newConnection();
             Channel channel = connection.createChannel()) {
            // 启用发布者确认
            channel.confirmSelect(); 
 
           // 将消息设置为持久化
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2
                    .build();
                    
            // 添加确认监听器
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTagboolean multiplethrows IOException {
                    System.out.println("消息已确认: " + deliveryTag);
                    // 消息正确到达Broker时的处理逻辑
                }
 
                @Override
                public void handleNack(long deliveryTagboolean multiplethrows IOException {
                    System.out.println("消息未确认: " + deliveryTag);
                    // 因为内部错误导致消息丢失时的处理逻辑
                }
            });
 
            channel.basicPublish("", queueName, properties, message.getBytes());
 
            // 等待消息确认,或者超时
            boolean allConfirmed = channel.waitForConfirms();
            
            if (allConfirmed) {
                //所有消息都已确认
            } else {
                //超时或其它
            }
           
        } catch (IOException | TimeoutException | InterruptedException e) {
            e.printStackTrace();
        }
}

消息持久化在RabbitMQ中,消息的持久化它确保消息不仅存储在内存中,而且也安全地保存在磁盘上。

这样,即使在RabbitMQ服务崩溃或重启的情况下,消息也不会丢失,可以从磁盘恢复。

Exchange持久化:

// 设置 durable = true; 
channel.exchangeDeclare(exchangeName, "direct", durable);

消息持久化

// 设置 MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

Queue持久化

//设置 boolean durable = true;
channel.queueDeclare(queueName, durable, exclusive, false, null);

消费者确认机制

默认情况下,以下3种原因导致消息丢失:

  1. 网络故障:消费端还没接收到消息之前,发生网络故障导致消息丢失;
  2. 未接收消息前服务宕机:消费端突然挂机未接收到消息,此时消息会丢失;
  3. 处理过程中服务宕机:消费端正确接收到消息,但在处理消息的过程中发生异常或宕机了,消息也会丢失。

这是因为RabbitMQ的自动ack机制,即默认RabbitMQ在消息发出后,不管消费端是否接收到,是否处理完,就立即删除这条消息,导致消息丢失。

将自动ack机制改为手动ack机制

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    try {
        //接收消息,业务处理
        //设置手动确认
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    } catch (Exception e) {
        //发生异常时,可以选择重新发送消息或进行错误处理
        // 例如,可以选择负确认(nack),让消息重回队列
        // channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
    }
};
//设置autoAck为false,表示关闭自动确认机制,改为手动确认
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});
 

消息补偿机制

以上3种解决办法理论上可靠,但是系统的异常或者故障比较偶然,我们没法做到100%消息不丢失。

因此需要介入补偿机制或者人工干预。这是我们的最后一道防线。如何做消息补偿呢?

其实就是将消息入库,通过定时任务重新发送失败的消息。

xiaoxibuchang-2025-07-21-11-03-53

  • 生产端发送消息;
  • 确认失败,将消息保存到数据库中,并设置初始状态0;
  • 定时任务以一定频率扫描数据库中status=0 的消息(失败消息);
  • 重发消息,可多次;重发成功,更新数据库:status=1;
  • 超过固定次数重发仍然失败,人工干预。

超过最大失败次数后,对于无法被正常消费的消息可移入死信队列。

可人工干预手动排查也可自动重试,需要实现一个消费者来从死信队列中获取消息,并根据业务逻辑来决定是否以及如何重新发送消息。

这里涉及到消息去重、幂等性处理等。

消息重复

产生原因

  • 网络问题:消费者处理完消息后,因网络问题导致确认信息未能成功发送回消息队列。
  • 服务中断:消费者在确认消息之前服务崩溃,消息队列未收到确认信号。
  • 确认机制:自动确认模式下,如果确认在消息处理完成前发生,消息可能会被重复消费

解决方案

幂等性设计

设计消费者的消息处理逻辑时,要保证即使消息被多次消费,也不会对系统状态产生不良影响。

幂等性可以通过以下方式实现:

  • 数据库唯一约束:使用数据库的主键约束或唯一索引防止插入重复记录。
  • 业务逻辑检查:在执行业务操作前,先检查是否已经处理过该消息。

消息去重策略

使用唯一标识符(如订单号、massageID)来识别消息,并在消费者中实现去重逻辑:

  • 缓存检查:使用内存缓存(如Redis)存储已处理的消息ID。
  • 持久化存储:将消息ID与处理状态保存在数据库中,以便跨服务重启后仍然有效。

手动确认与重试机制

通过手动确认消息,控制消息何时从队列中移除:

  • 手动确认:在消息成功处理后,显式调用channel.basicAck()方法确认消息。
  • 重试机制:如果消息处理失败,可以选择将消息重新入队(channel.basicReject(requeue=true))或丢弃(channel.basicReject(requeue=false))。

消费者端去重逻辑

@RabbitListener(queues = "queueName"acknowledgeMode = "MANUAL")
public void receiveMessage(Message message, Channel channel) throws IOException {
    String messageId = message.getMessageProperties().getMessageId();
    
    // 检查消息是否已消费
    if (messageAlreadyProcessed(messageId)) {
        // 消息已消费,确认消息并返回
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        return;
    }
    
    // 处理消息
    try {
        processMessage(message);
        // 消息处理成功,持久化消息ID并确认消息
        persistMessageId(messageId);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
        // 处理失败,可以选择重新入队或丢弃
        boolean requeue = shouldRequeue(message);
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), requeue);
    }
}
 

生产者端发布确认

void sendWithConfirm(AmqpTemplate amqpTemplate, Message message) throws IOException {
    ConfirmCallback confirmCallback = (correlationData, ack, cause) -> {
        if (!ack) {
            // 处理消息发送失败的逻辑
            // ...
        }
    };
    amqpTemplate.setConfirmCallback(confirmCallback);
    amqpTemplate.convertAndSend("exchangeName""routingKey", message);
}

消息消费顺序性

在分布式系统中,消息队列(如 RabbitMQ)本身不保证消息的顺序消费,但在某些业务场景(如 订单创建 → 扣减库存)必须保证顺序执行。

以下是可行的解决方案及实现方式:

状态机 + 版本号(乐观锁)

适用场景:

  • 需要严格顺序执行,但允许短暂延迟(如订单创建后必须扣减库存)。
  • 适用于 事件溯源(Event Sourcing) 或 CQRS 架构。

消息体增加顺序标识:

{
  "msg_id": "order_123",
  "parent_msg_id": "create_order_123",  // 依赖的上一个消息ID
  "version": 2,                        // 版本号(用于乐观锁)
  "status": "pending",                 // 状态机(pending/processing/done)
  "data": {"order_id": 123, "action": "deduct_stock"}
}

消费者逻辑:

def consume_message(msg):
    # 1. 检查前置消息是否完成(如查询数据库或Redis)
    if not check_parent_msg_completed(msg["parent_msg_id"]):
        # 未完成则重新入队(或延迟重试)
        requeue_message(msg)
        return
 
    # 2. 乐观锁检查版本号
    if msg["version"] != get_current_version(msg["msg_id"]):
        return  # 已过期,丢弃
 
    # 3. 执行业务逻辑(如扣减库存)
    process_business(msg)
 
    # 4. 更新状态为完成
    update_msg_status(msg["msg_id"], "done")

消息堆积

快速诊断

诊断项命令/方法说明
1. 查看堆积数量rabbitmqctl list_queues name messages messages_ready messages_unacknowledged瞬时快照
2. 查看消费速率rabbitmqctl list_consumers -p <vhost>观察 ACK 速率
3. 查看节点资源rabbitmqctl statusCPU、内存、磁盘

通用解决策略

策略做法FastAPI 代码示例
① 扩容消费者水平增加 worker 进程celery -A proj worker -Q high -c 20
② 限流 & 预取设置 prefetch_count 防止一次性拿太多celery.conf.worker_prefetch_multiplier = 1
③ 优化业务逻辑减少单次处理耗时(缓存、批处理)见下方优化代码
④ 消息 TTL & DLQ加过期时间 + 死信队列兜底队列声明参数
⑤ 临时快速清空新建无逻辑消费者把消息落库见下方应急代码
⑥ 拆分队列按优先级/业务拆分,避免单队列瓶颈Queue('urgent'), Queue('normal')

生产

消费者突然下线

  • 压测还是有必要的
  • 消费者建议加上prefetchcount,对消费端限流,防止出现把消费者压跨
  • 一个Channel对应一个队列,万一Channel出现问题也只影响一个队列
  • 日志建议把AMQPClient异常整个打印出来,方便定位问题
  • 服务端的监控体系得建立起来

PRECONDITION_FAILED 406

消息已经被确认了,没法再不确认 deliveryTag 对应的消息了。

根因:一个消息只能被确认一次;自动确认 与 手动确认 互斥。

// consumer处理成功后,通知broker删除队列中的消息,如果设置multiple=true,表示支持批量确认机制以减少网络流量
channel.basicAck(deliveryTag, multiple);
 
// 拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列,该方法reject后,该消费者还是会消费到该条被reject的消息
channel.basicReject(deliveryTag, requeue);
 
// 不确认 deliveryTag 对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue,与basic.reject区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息。nack后的消息也会被自己消费到。
channel.basicNack(deliveryTag, multiple, requeue);
 
// 是否恢复消息到队列,参数是是否requeue,true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己。
channel.basicRecover(false);

需要注意的是,如果要使用手动 ACK,一定要关闭主动 ACK 功能。

spring:
  rabbitmq:
    host127.0.0.1
    port5672
    usernameguest
    passwordguest
    listener:
      typesimple
      simple:
        default-requeue-rejectedfalse
        acknowledge-modemanual

如果不设置spring.rabbitmq.listener.direct.acknowledge-mode=manual等相关手动 ACK 设置,就会发生 406 PRECONDITION_FAILED 异常,

即自动确认了一次消息后,又手动再次 ack 或拒绝消息。

这个错误信息:ERROR CachingConnectionFactory Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)

通常表明在 RabbitMQ 的通信过程中出现了问题,导致通道(Channel)关闭。

可能原因

  • 手动ACK与自动ACK冲突:在 RabbitMQ 中,消息的确认(ACK)可以是手动的也可以是自动的。如果在配置中设置了自动 ACK,但在消费者代码中又手动调用了basicAck方法,就可能导致这个错误。这是因为 RabbitMQ 认为消息已经被处理,但客户端又尝试再次确认,造成了冲突。
  • 队列声明异常:如果 RabbitMQ 客户端尝试声明一个已经存在的队列或交换机,并且使用了不同的参数(如不同的交换机类型或队列属性),也可能导致这个错误。RabbitMQ不允许在不匹配的参数下重新声明队列或交换机。
  • 网络问题:网络不稳定或中断可能导致 RabbitMQ 客户端与服务器之间的连接出现问题,从而触发通道关闭。
  • RabbitMQ服务器配置问题:RabbitMQ 服务器的配置问题,如资源限制、权限设置不当等,也可能导致客户端无法正常通信。
  • 消费者处理消息超时:如果消费者处理消息的时间过长,超过了 RabbitMQ 服务器设置的超时时间,服务器可能会关闭通道。

修复方法

  • 检查ACK模式:确保在 RabbitMQ 的消费者配置中,ACK 模式是一致的。如果配置为手动 ACK,那么在消费者代码中必须显式调用basicAck方法。如果配置为自动 ACK,则不需要在代码中进行确认。
  • 检查队列和交换机声明:确保在客户端代码中声明的队列和交换机与 RabbitMQ 服务器上现有的队列和交换机参数一致。如果需要,可以在客户端代码中添加逻辑来检查队列或交换机是否存在,或者使用ignoreDeclarationExceptions属性来忽略声明异常。
  • 网络和服务器检查:检查网络连接是否稳定,以及 RabbitMQ 服务器是否正常运行。可以通过 RabbitMQ 的管理界面或命令行工具来检查服务器状态。
  • 调整RabbitMQ服务器配置:如果问题是由于服务器配置不当引起的,需要根据实际情况调整配置。例如,可以增加资源限制,或者调整权限设置。
  • 优化消费者处理逻辑:如果消费者处理消息的时间过长,可以尝试优化代码逻辑,减少处理时间。此外,可以考虑增加消费者数量来提高处理能力。
  • 监控和日志:增加日志记录,以便在出现问题时能够快速定位原因。同时,可以使用RabbitMQ的监控插件来监控系统状态,及时发现并解决问题。
  • 使用RabbitMQ的连接恢复机制:Spring Boot集成 RabbitMQ 时,可以使用CachingConnectionFactory的连接恢复机制。在application.properties或application.yml中设置spring.rabbitmq.connection-recovery-enabled=true,这样在连接出现问题时,RabbitMQ 客户端会自动尝试恢复连接。

消息阻塞

维度推荐做法代码/配置示例风险点
ACK 模式统一 手动 ACKack-mode=manualchannel.basicAck(tag)finally自动 ACK 下 QOS 失效
消息签收try-catch-finally 确保必签收python try: ... finally: ch.basicAck(tag)业务异常导致消息永远 UnACK
vhost 隔离每个业务项目独立 vhostrabbitmqctl add_vhost biz_vhost资源/权限冲突
死信队列设置 x-dead-letter-exchangearguments={'x-dead-letter-exchange': 'dlx'}消息丢失或无法追踪
队列长度限制x-max-lengthx-max-length-bytesqueue_declare(..., arguments={'x-max-length': 10000})超出即丢弃或进入 DLX
Exchange 选型Direct 最快(精确路由)exchange_declare(type='direct')Topic/Fanout 路由开销更大
监控告警队列深度、UnACK、连接数Prometheus + Grafana Dashboard无监控等于盲飞

必须使用手动 Ack 模式

# Python pika 示例
channel.basic_consume(
    queue='order_queue',
    on_message_callback=callback,
    auto_ack=False  # 关闭自动ACK
)
 
def callback(ch, method, properties, body):
    try:
        process_order(body)  # 业务处理
        ch.basic_ack(delivery_tag=method.delivery_tag)  # 成功确认
    except Exception as e:
        logger.error(f"处理失败: {e}")
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)  # 不重试,进入DLQ

生产环境隔离策略

VHost 隔离

# 创建业务专用VHost
rabbitmqctl add_vhost order_vhost
rabbitmqctl set_permissions -p order_vhost app_user ".*" ".*" ".*"

连接参数规范

params = pika.ConnectionParameters(
    host='rabbitmq.prod',
    virtual_host='order_vhost',  # 指定VHost
    credentials=pika.PlainCredentials('app_user', 'secure_password'),
    heartbeat=600,  # 心跳检测连接存活
    blocked_connection_timeout=30  # 网络阻塞超时
)

死信队列(DLX)标准化配置

声明死信交换机和队列

# 主队列声明时绑定DLX
channel.queue_declare(
    queue='order_queue',
    arguments={
        'x-dead-letter-exchange': 'dlx.order',
        'x-dead-letter-routing-key': 'failed.orders'
    }
)
 
# 死信队列声明
channel.exchange_declare('dlx.order', exchange_type='direct')
channel.queue_declare(queue='dead_letter.order')
channel.queue_bind('dead_letter.order', 'dlx.order', routing_key='failed.orders')

消费死信消息

def process_dlx(ch, method, properties, body):
    logger.error(f"死信消息: {body}, 原因: {properties.headers.get('x-death')}")
    alert_admin(body)  # 通知管理员
    ch.basic_ack(delivery_tag=method.delivery_tag)

队列限制与性能优化

队列保护参数

channel.queue_declare(
    queue='protected_queue',
    arguments={
        'x-max-length': 5000,          # 消息数限制
        'x-max-length-bytes': 10485760, # 10MB大小限制
        'x-overflow': 'reject-publish'  # 超限拒绝新消息
    }
)

参考:

RabbitMQ真实生产故障问题还原与分析

← Previous postMongoDB夯实基础
Next post →Nginx夯实基础