残月的技术日志

残月的技术日志

如何保障RabbitMQ的消息可靠性

2024-08-05

在现实业务中,通常需要使用消息队列(MQ)进行异步操作以及应用程序间的解耦

由于应用之间的通讯依靠MQ,所以,确保MQ消息的可靠性就尤为重要

在Java开发中,往往都采用RabbitMQ或Kafka,本文将以RabbitMQ举例,研究如何确保消息的可靠性。

图片来源网络

上图是RabbitMQ消息传递的示意图,大致可以看成三个角色,生产者(Producer); 消息队列(MQ); 消费者(Consumer)

在RabbitMQ中,生产者将消息 推送至MQ的交换机(Exchange),有交换机根据指定的路由算法将消息转发到指定队列,最后在和消费者建立连接后,MQ将消息投递给消费者

我们按顺序

保障生产者的可靠性

在生产者向MQ推送消息时,由于网络或是MQ服务异常等原因,可能会出现消息推送失败的情况

在RabbitMQ中,提供了三种机制用于应对这类问题:生产者重连机制生产者确认机制生产者事务机制

生产者重连机制

生产者重连机制,指的是当生产者无法连接到MQ时,重新尝试与MQ建立连接的一种机制

当网络不稳定时,生产者重连机制可以提高消息推送的成功率

在SpringAMQP中,重连机制是一种堵塞式的重试,在重试等待的这个过程中,整个线程将会被堵塞

开启生产者重连机制可能会对程序性能生产影响,可以考虑结合业务场景调整等待时间或重试次数,也可使用多线程优化程序的性能

在SpringAMQP中开启的方法也比较简单,我们只需要在配置文件中设置即可

spring:
  rabbitmq:
    connection-timeout: 1s    #连接超时时间,到达该时间认定为推送超时
    template:
      retry:
        enabled: true    #启用失败重试(连接失败时)
        initial-interval: 1s   #失败后重试等待的初始时间
        multiplier: 2    #重试的等待时长倍数(也就是下一次重试的等待时长,是上一次重试的几倍)
        max-attempts: 5  #最大尝试次数(重试次数+1)

我们就按这个配置,做一个实验

07-30 13:59:25:681  INFO 3336 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5673]
07-30 13:59:26:686  INFO 3336 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5673]
07-30 13:59:28:689  INFO 3336 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5673]
07-30 13:59:32:692  INFO 3336 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5673]
07-30 13:59:40:695  INFO 3336 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5673]
org.springframework.amqp.AmqpConnectException: java.net.ConnectException: 拒绝连接 (Connection refused)
..........

可以发现,配置项中的initial-interval 只是第一次重试后的等待时间

第一次连接由13:59:25发出

由于我手动关闭了服务器,所以1秒后,也就是13:59:26 又发起了一次连接

第三次受倍数(multiplier 项)影响,在间隔2秒后也就是13:59:28 发起了重试连接

直到连接总次数到达5次,也就是max-attempts设置的次数,不再重试,抛出异常

生产者确认机制

生产者确认机制,其核心思想实在MQ收到生产者推送的消息后,返回给生产者一个消息回执

这就像寄快递,寄件人将包裹交给小哥,小哥在受到快递后,交完钱,要求没问题小哥就会告诉你:可以了,你可以走了

回执的消息有以下两种:

消息

描述

可能出现的原因

ACK

推送成功

一切正常 / 已推送但MQ内部出现出错

NACK

推送成功

推送失败

当消息成功的推送至MQ,但由于交换机(Exchange)配置错误,导致消息路由异常,此时也会返回ACK,但会带上路由异常的原因

这通常都是由开发/运维人员配置错误导致,不算常见

结合上没有回执的情况,也就是有三种情况,要是没受到如何回执,生产者通常会重新发送

消费者的确认机制,其实可以细分为以下两个: Publisher Confirm 和 Publisher Returns

Publisher Confirm

Publisher Confirm可以确保消息被RabbitMQ正确接收并存储在队列中

开启的方法也比较简单,我们先修改配置文件:

spring:
  rabbitmq:
    #对影响性能大,非必要勿开
    publisher-confirm-type: correlated  #none 关闭生产者确认;simple 同步阻塞方式;correlated 异步回调方式

Publisher Confirm有三种模式:

  • none: 关闭Publisher Confirm

  • simple: 同步确认模式,生产者在发送消息后等待RabbitMQ的确认消息。只有在收到确认后,生产者才会继续执行后续操作

  • correlated(推荐): 异步确认模式,生产者在发送消息后立即继续执行其他任务,通过回调函数处理MQ的回执

然后我们需要在推送消息时,单独指定回调

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.UUID;

@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate; 

    private CorrelationData getDefaultCorrelationData() {
        //创建CorrelationData
        CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
        //给Future添加Callback
        //我这里是两个函数,其实这里可以还可以传SuccessCallback,FailureCallback
        cd.getFuture().addCallback(
                //Success
                result -> {
                    if (result.isAck()) {
                        log.debug("消息发送成功,收到ACK, Reason:{}", result.getReason());
                    } else {
                        log.error("消息发送失败,收到NACK, Reason:{}", result.getReason());
                        //实际业务可能要在这重发消息
                    }
                },
                //Failure,这里和MQ无关,是Spring在内部调用时出错,通常不会触发
                ex -> log.error("消息回调失败", ex)
        );
        return cd;
    }

    @Test
    public void testConfirmCallback() throws InterruptedException {
        CorrelationData cd = getDefaultCorrelationData();
        String exchange = "test.direct1";    //这里我故意填错
        String msg = "Hello_Canyue";
        rabbitTemplate.convertAndSend(exchange, "red", msg, cd);
        Thread.sleep(2000);    //等回调处理
    }

}

这里我故意填错交换机名称,看看日志输出:

07-30 15:18:04:272 ERROR 4722 --- [ 127.0.0.1:5673] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'test.direct1' in vhost '/', class-id=60, method-id=40)
07-30 15:18:04:276 ERROR 4722 --- [nectionFactory2] com.itheima.publisher.SpringAmqpTest     : 消息发送失败,收到NACK, Reason:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'test.direct1' in vhost '/', class-id=60, method-id=40)

Reason中会带上错误信息

我们改成正确的看看

07-30 15:20:46:997 DEBUG 4836 --- [ 127.0.0.1:5673] com.itheima.publisher.SpringAmqpTest     : 消息发送成功,收到ACK, Reason:null

Publisher Returns

Publisher Returns可以确保消息能够被正确路由到目标队列

通常路由失败是由开发/运维人员的错误操作引起,通常不开启

开启的方法也很简单,先修改配置文件:

spring:
  rabbitmq:
    #对影响性能大,非必要勿开
    publisher-returns: true  #开启路由失败返回

每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置,我们编写个配置项类:

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class MqConfirmConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate template = applicationContext.getBean(RabbitTemplate.class);
        //配置回调
        template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.debug("收到return callback: exchange:{},key:{},msg:{},replyCode:{},replyText:{}",
                        returned.getExchange(), returned.getRoutingKey(),
                        returned.getMessage(), returned.getReplyCode(),returned.getReplyText());
            }
        });
    }
}

配置一个交换机test.direct,与队列的绑定如下

To

Routing Key

direct.queue1

red

direct.queue1

blue

写一个测试方法,我们这次故意设置一个不存在的Routing Key,使其发生路由错误

import ...

@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;    

    @Test
    public void testRouterError(){
        String exchange = "test.direct";
        String msg = "Hello_Canyue";
        rabbitTemplate.convertAndSend(exchange,"green",msg);
    }
}

看看日志输入:

07-30 15:06:50:767 DEBUG 4369 --- [nectionFactory1] c.i.publisher.config.MqConfirmConfig     : 收到return callback: exchange:test.direct,key:green,msg:(Body:'"Hello_Canyue"' MessageProperties [headers={__TypeId__=java.lang.String}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),replyCode:312,replyText:NO_ROUTE

生产者事务机制

在RabbitMQ中,生产者事务机制是一种确保消息原子性发送的方法。

也就意味着,如果事务中有任何一条消息投递失败,那么事务中的所有消息都会被回滚,即都不会被RabbitMQ接收。

我们来做个实验(未成功):

先再创建一个配置文件,开启事务管理

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqTransactionConfig {

    @Bean("rabbitTransactionManager")
    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }

    @Bean
    public RabbitTemplate transactionRabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }
}

编写一个消费者,监听队列的消息

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

import java.util.Map;

@Slf4j
@Component
public class MqListeners {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "direct.queue1",declare = "true"),
            exchange = @Exchange(value = "test.direct",type = ExchangeTypes.DIRECT),
            key = {"red","blue"}
    ))
    public void directReceive1(String msg) throws InterruptedException {
        System.out.printf("消费者1,监听到direct.queue1的消息:[%s]\n",msg);
    }
}

我们先测试,事务中有投递失败的情况

    @Test
    @Transactional
    public void testTransactional(){
        String exchange = "test.direct";
        // 将信道设置为事务模式
        rabbitTemplate.setChannelTransacted(true);
        //我能被成功路由
        rabbitTemplate.convertAndSend(exchange,"blue","Hello_blue");
        //我不能能被成功路由
//        rabbitTemplate.convertAndSend(exchange,"greed","Hello_green");
    }

本试验失败,待研究

不知道为什么,我的事务即使没在都能推送的情况下一直在回滚

保障消息在MQ中的可靠性

RabbitMQ为确保性能,默认将收到的消息存放在内存中

若此时MQ宕机,就有可导致内存中的消息丢失

且当内存满时,若消费者无法及时消耗队列中的消息,可能会导致消息的积压,引发MQ堵塞

PageOut

这里先介绍一个概念

当内存占满后,MQ会执行PageOut操作,将部分消息持久至磁盘,以腾出跟多内存接受新的消息

但要注意,在执行PageOut操作的过程中,MQ处于阻塞状态,只有等到操作结束,MQ才能接收新消息

持久化对象

持久化对象主要是为了防止服务器重启/服务宕机等等情况,导致保存在内存中的消息丢失

数据持久化主要有三个实现

交换机与队列的持久化

交换机持久化和消息队列的持久化,其实可以把他理解为是否为一个临时的交换机/队列

持久化(Durable)的交换机/队列,在MQ重启后依旧存在,反之临时(Transient)的就将不存在

我们可以通过一下方式创建持久化的交换机/队列

  • 通过控制台

控制台的方式比较简单,直接在创建时Durability选项选择为Durable就行,我的版本默认就是这个

  • 通过代码声明

在实际开发中,可能更多是开发者在代码中声明,然后自动创建需要的交换机/队列

我们可以通过编写一个配置类实现,如下:

//import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * <h1>在Java代码中声明交换机与队列</h1>
 * 通常来说,在消费者端声明
 */
@Configuration
public class FanoutConfiguration {

    @Bean
    public FanoutExchange fanoutExchange() {
        //也可以通过ExchangeBuilder
        //return ExchangeBuilder.fanoutExchange("test.fanout2").build();
        
        //params:name,durable,autoDelete(不再使用时是否自动删除)
        //return new FanoutExchange("test.fanout2",false,true);
        return new FanoutExchange("test.fanout2",true,false);
    }

    @Bean
    public Queue fanoutQueue3() {
        //durable[默认] 持久的:会被持久化到磁盘,
        //return QueueBuilder.durable("fanout.queue3").build();
        
        //params:name,durable,autoDelete(不再使用时是否自动删除)
        return new Queue("fanout.queue3");
    }

    @Bean
    public Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange) {
        return BindingBuilder
                .bind(fanoutQueue3)
                .to(fanoutExchange);
    }

}

也可以在监听器中用注解声明,如下:

 @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "direct.queue2",declare = "true"),
            exchange = @Exchange(value = "test.direct",type = ExchangeTypes.DIRECT),
            key = {"red","yellow"}
    ))
public void directReceive2(String msg) throws InterruptedException {
    System.out.printf("消费者2,监听到direct.queue2的消息:[%s]\n",msg);
}

这里要注意! 交换机/消息队列的持久化与消息的持久化是不同的概念

如果一个队列是持久化的,但消息不是持久化的,那么MQ重启后,队列虽仍然存在,但队列中的非持久化消息会丢失

如果一个交换机是持久化的,但与它绑定的队列不是持久化的,那么MQ重启后,交换机仍然存在,但与之绑定的非持久化队列会丢失

如果一个队列不是持久化的,但存在一条持久化的消息,在MQ重启后,队列将不存在,即使重新创建同名队列也找不到那条持久化的消息(至少我的实验结果是这样的)

也就是说,只有队列和消息都是持久化的,消息才会在重启后被保留

持久化的交换机/队列,在控制台中或有字母D标识,如下图

我标记的是一个非持久化的队列

消息的持久化

我们可以设置消息的Delivery mode属性,指定消息是否是持久化的

默认Delivery mode为1,及存储在内存中,MQ重启可能会导致消息丢失,在内存不足时可能会因PageOut操作持久化到硬盘

当将Delivery mode设置为2时,也就是Persistent ,代表消息是持久化的,消息在到达MQ后就立即保存一份至硬盘

我们可以在控制台上看到消息的Delivery mode属性

不过在我们实际的开发中,更多实在生产者端用代码声明,如下:

@Test
public void testPersistentMessage() {
    Message message = MessageBuilder
        .withBody("\"Hello\"".getBytes(StandardCharsets.UTF_8))
        .setDeliveryMode(MessageDeliveryMode.PERSISTENT)   //持久化
        //  .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)   //非持久化
        .build();
    rabbitTemplate.convertAndSend("test.queue", message);
}

消息的持久化会增加IO,或多或少会降低MQ性能

但由于消息在进入MQ时就事先已经下落至硬盘,所以在触发PageOut时只需清除部分内存中的消息,可以降低PageOut的堵塞时间

LazyQueue

RabbitMQ 3.6及之后的版本中,引入了LazyQueue(惰性队列)

惰性队列具有以下特点

  • 接受到的消息将直接存放至硬盘

  • RabbitMQ会在内存中缓存最新的几条消息

  • 当消费者在消费时,若缓存未命中,RabbitMQ会从硬盘边读取

惰性队列可以有效降低队列的内存占用,适合于消息量不大,但消息到达频率很频繁的场景

Set the queue into lazy mode, keeping as many messages as possible on disk to reduce RAM usage; if not set, the queue will keep an in-memory cache to deliver messages as fast as possible.

要创建一个惰性队列,我们要在队列的参数(Arguments)中添加x-queue-mode参数并将值设置为lazy指定

若未指定,默认不启用惰性队列(好像3.12后默认启用)

我们可以在控制台添加:

当然,实际中更多还是在消费者端使用代码声明,以注解形式举例:

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

import java.util.Map;

@Slf4j
@Component
public class MqListeners {

    @RabbitListener(queuesToDeclare = @Queue(
            name = "test.lazy",
            declare = "true",    //队列是否持久化
            arguments = @Argument(name = "x-queue-mode",value = "lazy")  //惰性队列
    ))
    public void lazyQueueReceive(String msg) {
        log.info("接收到 test.lazy的消息:{}",msg);
    }

}

性能差异

LazyQueue的策略是尽可能将消息存放至硬盘,这可以尽可能避免应内存不足触发PageOut操作导致堵塞

这里我们对比大量写入的情况,先是不开启LazyQueue

再看看开启了LazyQueue

可以发现,LazyQueue虽然写入性能上较慢,但并没有触发PageOut

但由于大量的消息存放在硬盘,LazyQueue可能不适合大量频繁读取的场景

我们做个实验,使用同一个消费者同时消费50万条的消息,对比下消费者的耗时

生产50万条消息

import ...

@SpringBootTest
public class SpringAmqpTest {
    @Test
    public void lazyTimingExperiment() {
        for (int i = 1; i < 500000; i++) {
            rabbitTemplate.convertAndSend("test.lazy", String.valueOf(i));
        }
        rabbitTemplate.convertAndSend("test.lazy", "END");   //结束标识
    }
}

消费者:

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.Map;

@Slf4j
@Component
public class MqListeners {

    @Value("${canyue.test.startTime}")
    private Long startTime;

    @RabbitListener(queuesToDeclare = @Queue("test.lazy"))
    public void lazyTimingExperiment(String msg){
        if(startTime == -1){
            startTime = System.currentTimeMillis();
        }
        if(msg.equals("END")){
            long endTime = System.currentTimeMillis();
            System.err.printf("消费者耗时:%d秒\n",(endTime-startTime) / 1000);
        }
    }
}

在不启用LazyQueue的情况下:

消费者耗时:12秒

在开启LazyQueue的情况下:

消费者耗时:14秒

这里有2秒的差距,其实在控制台中会更明显

当然,还有更多方式去保障消息在MQ中的可靠性
例如可以考虑极限队列,甚至可以在硬件层面入手,具体要根据业务需求调整

保障消费者的可靠性

我们还差最后一个环节,如何保障消费者相关的可靠性

先来讨论如何确保消费者成功接受到消息,且保障在处理成功前,消息不丢失

消费者确认机制

RabbitMQ提供了和生产者类似的消费者确认机制

消费者确认机制就像取快递,包裹里面的内容比较贵重,收件人(消费者)在拿到手上后,在快递员面前验货

小哥要在收件人检查没问题后,签字确认

而确认机制就要求,MQ向消费者推送消息,当消费者处理结束后,会向MQ发送一个回执告诉MQ消息处理的状态,回执有一下三种:

消息

描述

可能出现的原因

ACK

消费者处理成功,MQ因从队列中删除该消息

消费者成功处理消息

NACK

消费者处理失败,MQ需重新投递

消费者处理失败

REJECT

消费者拒收消息,MQ因从队列中删除该消息

消息不符合消费者预期

更多细节,官方文档讲的很详细:

https://rabbitmq.org.cn/docs/confirms#acknowledgement-modes

其实在Spring AMQP中,已经替我们实现了消费者回执相关功能

我们可以先修改消费者的配置文件

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 消费者确认机制,none:不启用,manual:手动发送ACK/REJECT,auto:由Spring自动处理

acknowledge有三种模式:

  • none:不启用,不等消息处理,都直接返回ACK,这么做通常是为了提升性能

  • manual:手动发送,自行调用相关API,发送ACK/Nack/REJECT,可能存在业务入侵

  • auto:由Spring AMQP自动发送

业务正常:返回ACK

业务异常,返回NACK

消息处理/校验异常:返回REJECT

同样,我们可以在队列控制台中,看到这两个指标

  • Ready:待消费消息计数

  • Unacked:待消费者确认消息计数

自动处理(推荐)

我们来做个实验,手动写一个存在业务异常的消费者,返回NACK

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.Map;

@Slf4j
@Component
public class MqListeners {

    @RabbitListener(queuesToDeclare = @Queue("test.queue"))
    public void testError(String msg){
        log.info("消费者接收到消息:{}",msg);
        //这书一个没被处理的异常
        throw new RuntimeException("哎呀,出错了");
    }
}

队列中有两条消息,[“1”,“2”],我们看控制台输出

....
08-03 09:01:07:278  INFO 5171 --- [ntContainer#6-1] c.i.consumer.listeners.MqListeners       : 消费者接收到消息:1
....
08-03 09:01:07:280  INFO 5171 --- [ntContainer#6-1] c.i.consumer.listeners.MqListeners       : 消费者接收到消息:1
....
08-03 09:01:07:283  INFO 5171 --- [ntContainer#6-1] c.i.consumer.listeners.MqListeners       : 消费者接收到消息:1
....

可以看到,同一条消息发生了多次投递(其实这里会无限次一直投递),针对重复投递问题,结合下面消费者重试机制可以解决

手动处理

这是RabbitMQ提供的API,用于开发人员手动向RabbitMQ回执

用于传递确认的 API 方法通常在客户端库中以在通道上的操作的形式开放

手动处理需要开发者向业务逻辑中添加回执相关的代码,可能会造成业务侵入;但相对于自动处理,手动处理具有更高的灵活性

一些概念

在继续往下之前,我们需要先了解些基础概念

传递标签

在消费者向RabbitMQ注册后,会创建一个通道(Channel),MQ 会用 basic.deliver 方法向消费者推送消息

而传递标签(deliveryTag),用于唯一标识通道中的某一消息

传递标签是一个单调递增的正整数,消费者可以利用传递标签顺序处理消息,确保消息的顺序性

同时确认多条消息

为节省带宽,API允许同时确认多条消息

提供的部分方法带有multiple 参数,单设置为true时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息

发送回执

回归正题,让我们先修改配置文件:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual # 消费者确认机制,none:不启用,manual:手动发送ACK/REJECT,auto:由Spring自动处理

先来看看这三个方法的定义

  • channel.basicAck

向MQ发送确认回执

/**
     * 确认收到一个或多个消息
     * @param deliveryTag 传递标签
     * @param 是否同时确认多条消息
     */
    void basicAck(long deliveryTag, boolean multiple) throws IOException;

  • channel.basicNack

拒收一条或多条消息

你可以通过requeue参数指定拒收的消息是否回到队列重新投递(就像自动处理时发送的NACK)

当requeue为false时,若未配置死信交换机,消息将被丢弃

    /**
     * 拒收一条或多条消息
     * @param deliveryTag 传递标签
     * @param requeue 如果选择true,则拒收的消息将重新进入队列,false时若队列为配置死信交换机,消息将被丢弃
     */
    void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

  • channel.basicReject

拒收一条消息

你可以通过requeue参数指定拒收的消息是否回到队列

当requeue为false时(就像自动处理的REJECT),若未配置死信交换机,消息将被丢弃

    /**
     * 拒收一条消息
     * @param deliveryTag 传递标签
     * @param requeue 如果选择true,则拒收的消息将重新进入队列,false时若队列为配置死信交换机,消息将被丢弃
     */
    void basicReject(long deliveryTag, boolean requeue) throws IOException;

相信看了定义应该也能看出使用方法

使用方法也很简单,我随便举个例子,具体要照业务实际开发

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

@Slf4j
@Component
public class MqListeners {

    @RabbitListener(queuesToDeclare = @Queue("test.queue"))
    public void testManual(Message message, Channel channel) throws IOException {
        //拿到DeliveryTag
        long tag = message.getMessageProperties().getDeliveryTag();
        String msg = "";
        //下面这些应该按照实际业务需求编写
        try{
            msg = new String(message.getBody());
        }catch (Exception e){
            //requeue == false 不重发,若未配置死信队列,消息将被丢弃
            channel.basicReject(tag,false);
        }

        String[] data = new String[2];
        try{
            data = msg.split(":");
        }catch (Exception e){
            //这里虽然回执NACK,但消息也不会重发
            channel.basicNack(tag,true,false);
        }

        log.info("{}今年{}岁了",data[0],data[1]);
        //ACk
        channel.basicAck(tag,true);
    }
}

更多相关内容,可以看官方文档:

https://rabbitmq.org.cn/docs/confirms

恢复未确认的消息

发现RabbitMQ提供的API中有个basicRecover 方法

他可以让MQ重新发送未经消费者确认的消息,也就是在Unacked状态的消息

被重发的消息将放到对尾,通常在消费者因故障后重新连接时调用

/**
* 重新发送通道中处于未确认状态的消息
* @param 如果为 true,消息将被重新排队,并可能发送给不同的消费者。 如果为 false,消息将重新发送给相同的消费者。
*/
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

以上手动处理的内容,若在不使用Spring框架的项目中使用,可以参考官方的案例

https://rabbitmq.org.cn/docs/confirms#consumer-acks-api-elements

消费者重试机制

在之前消费者确认机制中,我们遇到了消息不断Requeue且又不断投递的情况

为解决MQ重复投递问题,我们可以再引入消费者的重试机制,限制同一条消息的最大重试次数

引入后,消息会在消费者本地进行重试(此时消息处于Unacked状态),当消息超出重试次数后,将其丢弃/转发到死信交换机

先修改配置文件:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 消费者确认机制,none:不启用,manual:手动发送ACK/REJECT,auto:由Spring自动处理
        retry:
          enabled: true      #开启确认机制
          initial-interval: 1000ms    #失败后,重新等待的初始时间
          multiplier: 1      #重试的等待时长倍数(也就是下一次重试的等待时长,是上一次重试的几倍)
          max-attempts: 3    #最大尝试参数
          stateless: true    # 重试是是否不带上状态, true 不带状态; false 带状态 (用于业务中包含事务,等需包含上下文的情况)

我们在用那个存在异常业务的消费者看看

@RabbitListener(queuesToDeclare = @Queue("test.queue"))
public void testError(String msg){
    log.info("消费者接收到消息:{}",msg);
    //这是一个没被处理的异常
    throw new RuntimeException("哎呀,出错了");
    }

再看看日志:

08-04 14:58:19:999  INFO 12076 --- [           main] c.itheima.consumer.ConsumerApplication   : Started ConsumerApplication in 13.868 seconds (JVM running for 14.496)
08-04 14:58:20:002  INFO 12076 --- [ntContainer#8-1] c.i.consumer.listeners.MqListeners       : 消费者接收到消息:1
08-04 14:58:21:005  INFO 12076 --- [ntContainer#8-1] c.i.consumer.listeners.MqListeners       : 消费者接收到消息:1
08-04 14:58:22:006  INFO 12076 --- [ntContainer#8-1] c.i.consumer.listeners.MqListeners       : 消费者接收到消息:1
08-04 14:58:22:012  WARN 12076 --- [ntContainer#8-1] o.s.a.r.r.RejectAndDontRequeueRecoverer  : Retries exhausted for message (Body:'[B@60313bdd(byte[1])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=true, receivedExchange=, receivedRoutingKey=test.queue, deliveryTag=1, consumerTag=amq.ctag-MBpImdk1WYtudhowxjUtCA, consumerQueue=test.queue])
08-04 14:58:22:013  WARN 12076 --- [ntContainer#8-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
......
08-04 14:58:22:018  INFO 12076 --- [ntContainer#8-1] c.i.consumer.listeners.MqListeners       : 消费者接收到消息:2
08-04 14:58:23:019  INFO 12076 --- [ntContainer#8-1] c.i.consumer.listeners.MqListeners       : 消费者接收到消息:2
08-04 14:58:24:021  INFO 12076 --- [ntContainer#8-1] c.i.consumer.listeners.MqListeners       : 消费者接收到消息:2
08-04 14:58:24:023  WARN 12076 --- [ntContainer#8-1] o.s.a.r.r.RejectAndDontRequeueRecoverer  : Retries exhausted for message (Body:'[B@56697fa6(byte[1])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=true, receivedExchange=, receivedRoutingKey=test.queue, deliveryTag=2, consumerTag=amq.ctag-MBpImdk1WYtudhowxjUtCA, consumerQueue=test.queue])
08-04 14:58:24:026  WARN 12076 --- [ntContainer#8-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

诶,很明显,现在消息重试3次后,就不会再重新投递,而是被直接删除

但,直接删除过于卤莽,我们可不可以将出现问题的消息连同一些上下文消息收集起来,方便后续定位原因

这里,就要用到一个概念。叫死信

死信的处理

先来了解什么是死信

死信(Dead Letter): 是指那些由于某种原因无法正常处理或路由到目的地的消息

按照官网文档的说法,造成一条消息变为死信的原因有以下几种可能:

  • 消费者使用basic.rejectbasic.nack方法,且设置requeuefalse,对消息进行了负确认

  • 消息过期,也就消息在队列的时间超过配置的TTL

  • 队列空间不足,最早的消息可能会成为死信

  • 消息超出最大重试次数

如果整个队列过期,队列中的消息将不会被死信化。

将重试失败的消息转发到死信交换机

这里我们先只讨论刚刚消费者重试机制失败的消息

在开启消费者重试机制后,若消息重试次数耗尽,就需要有MessageRecoverer接口处理

接口有三个实现:

  • RejectAndDontRequeueRecoverer(默认): 重试耗尽后,发送REJECT回执,并让MQ丢弃消息

  • ImmediatRequeueMessageRecoverer: 重试耗尽后,返回NACK,让消息重新入队,相当于只降低了MQ重发频率

  • RepublishMessaheRecoverer: 重试耗尽后,将消息投递至指定的交换机,通常是专门的死信交换机

我们可以使用第三种方案,消息在投递至死信交换机时,会带上原消息的内容以及一些异常消息

到时候可以使用一个专门的消费者收集这些死信,方便开发者定位问题

这么看这更像是一种兜底方案

通常我们会在消费者端通过配置类声明:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
//只有在配置文件中开启重试机制,本配置类才生效
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorConfiguration {
    //死信交换机
    @Bean
    public DirectExchange errorExchange() {
        return new DirectExchange("error.direct");
    }

    //死信队列
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue");
    }

    //绑定下
    @Bean
    public Binding errorBinding(){
        return BindingBuilder
                .bind(errorQueue())
                .to(errorExchange())
                .with("error");
    }

    //定义RepublishMessageRecoverer
    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
    }
}

我们还是用之前的带异常业务的消费者,重试三次,将消息投递到死信交换机

8-05 13:53:50:705  INFO 13650 --- [ntContainer#8-1] c.i.consumer.listeners.MqListeners       : 消费者接收到消息:我是一条死信
08-05 13:53:51:708  INFO 13650 --- [ntContainer#8-1] c.i.consumer.listeners.MqListeners       : 消费者接收到消息:我是一条死信
08-05 13:53:52:709  INFO 13650 --- [ntContainer#8-1] c.i.consumer.listeners.MqListeners       : 消费者接收到消息:我是一条死信
08-05 13:53:52:719  WARN 13650 --- [ntContainer#8-1] o.s.a.r.retry.RepublishMessageRecoverer  : Republishing failed message to exchange 'error.direct' with routing key error

可以看第三条日志,现在提示Republishing failed message to exchange,也就是这次将失败的消息重新投递到了指定的交换机

我们去控制台看看死信队列中的消息

不太好截,但还好,看个大概就行

可以发现,死信中而外包含了部分错误信息,这可以方便开发人员定位问题

更多细节可以看篇博文,写的很详细:

https://www.cnblogs.com/ybyn/p/13691058.html

那有没有更通用的方法呢,往下看:

为队列绑定死信交换机

我们可以通过队列的dead-letter-exchange属性指定一个交换机,这个交换机也就被绑定为这个队列的死信交换机

当队列中出现死信时,会自动转发到绑定的那个死信交换机

我们先编写一个配置类,定义本次实验需要用到的交换机和队列

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TestDeadLetterConfiguration {

    //先来定义正常业务的交换机和队列
    @Bean
    public DirectExchange testDirect2Exchange() {
        return new DirectExchange("test.direct2", true, false);
    }

    @Bean
    public Queue testQueue2(){
        return QueueBuilder
                .durable("test.queue2")
                //这里,其实就相当于添加dead-letter-exchange属性
                .deadLetterExchange("dlx.direct")
                //相当于dead-letter-routing-key属性
                .deadLetterRoutingKey("deadLetter")
                .build();
    }

    @Bean
    public Binding testBinding2(Queue testQueue2, DirectExchange testDirect2Exchange){
        return BindingBuilder
                .bind(testQueue2)
                .to(testDirect2Exchange)
                .with("test");
    }

    //再来定义死信的业务
    @Bean
    public DirectExchange dlxDirectExchange() {
        return new DirectExchange("dlx.direct");
    }

    @Bean
    public Queue dlxQueue() {
        return QueueBuilder.durable("dlx.queue").build();
    }

    @Bean
    public Binding dlxBinding() {
        return BindingBuilder
                .bind(dlxQueue())
                .to(dlxDirectExchange())
                .with("deadLetter");
    }
}

队列test.direct2绑定队列test.queue2,但这次,我不打算消费这条队列

队列test.queue2设置dead-letter-exchange属性,指定死信交换机为dlx.direct

私信交换机dlx.direct绑定死信队列dlx.queue ,到时候编写一个消费者监听

我们先来控制台看看,刚才创建的队列有没有添加相应的属性

OK,编写消费者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.Map;

@Slf4j
@Component
public class MqListeners {
    @RabbitListener(queuesToDeclare = @Queue("dlx.queue"))
    public void dlxExperiment(String msg){
        log.warn("消费者接收到死信:{}",msg);
    }
}

这次,打算在消息中设置TTL,等消息超时变为死信

@Test
public void testSendTTLMessage(){
Message message = MessageBuilder
    .withBody("Hello".getBytes(StandardCharsets.UTF_8))
    .setExpiration("1000")   //单位毫秒
    .build();
    rabbitTemplate.convertAndSend("test.direct2","test",message);
}

看看日志:

08-06 15:47:09:997  WARN 18843 --- [ntContainer#8-1] c.i.consumer.listeners.MqListeners       : 消费者接收到死信:Hello

OK,很满意

其实相同的操作,也可以用在实现MQ的延迟消息

The End

参考资料:

https://www.rabbitmq.com/docshttps://xie.infoq.cn/article/12c4bd997a7bd20985f2ddc00https://www.bilibili.com/video/BV1mN4y1Z7t9/https://springdoc.cn/spring-amqphttps://blog.csdn.net/fckbb/article/details/139263348

  • 0