如何保障RabbitMQ的消息可靠性
编辑在现实业务中,通常需要使用消息队列(MQ)进行异步操作以及应用程序间的解耦
由于应用之间的通讯依靠MQ,所以,确保MQ消息的可靠性就尤为重要
在Java开发中,往往都采用RabbitMQ或Kafka,本文将以RabbitMQ举例,研究如何确保消息的可靠性。
图片来源网络
上图是RabbitMQ消息传递的示意图,大致可以看成三个角色,生产者(Producer)
; 消息队列(MQ)
; 消费者(Consumer)
在RabbitMQ中,生产者将消息 推送至MQ的交换机(Exchange),有交换机根据指定的路由算法将消息转发到指定队列,最后在和消费者建立连接后,MQ将消息投递给消费者
我们按顺序
保障生产者的可靠性
在生产者向MQ推送消息时,由于网络或是MQ服务异常等原因,可能会出现消息推送失败的情况
在RabbitMQ中,提供了三种机制用于应对这类问题:生产者重连机制
,生产者确认机制
,生产者事务机制
生产者重连机制
生产者重连机制,指的是当生产者无法连接到MQ时,重新尝试与MQ建立连接的一种机制
当网络不稳定时,生产者重连机制可以提高消息推送的成功率
在SpringAMQP中,重连机制是一种堵塞式的重试,在重试等待的这个过程中,整个线程将会被堵塞
开启生产者重连机制可能会对程序性能生产影响,可以考虑结合业务场景调整等待时间或重试次数,也可使用多线程优化程序的性能
在SpringAMQP中开启的方法也比较简单,我们只需要在配置文件中设置即可
spring:
rabbitmq:
connection-timeout: 1s #连接超时时间,到达该时间认定为推送超时
template:
retry:
enabled: true #启用失败重试(连接失败时)
initial-interval: 1s #失败后重试等待的初始时间
multiplier: 2 #重试的等待时长倍数(也就是下一次重试的等待时长,是上一次重试的几倍)
max-attempts: 5 #最大尝试次数(重试次数+1)
我们就按这个配置,做一个实验
07-30 13:59:25:681 INFO 3336 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5673]
07-30 13:59:26:686 INFO 3336 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5673]
07-30 13:59:28:689 INFO 3336 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5673]
07-30 13:59:32:692 INFO 3336 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5673]
07-30 13:59:40:695 INFO 3336 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5673]
org.springframework.amqp.AmqpConnectException: java.net.ConnectException: 拒绝连接 (Connection refused)
..........
可以发现,配置项中的initial-interval
只是第一次重试后的等待时间
第一次连接由13:59:25
发出
由于我手动关闭了服务器,所以1秒后,也就是13:59:26
又发起了一次连接
第三次受倍数(multiplier 项)影响,在间隔2秒后也就是13:59:28
发起了重试连接
直到连接总次数到达5次,也就是max-attempts设置的次数,不再重试,抛出异常
生产者确认机制
生产者确认机制,其核心思想实在MQ收到生产者推送的消息后,返回给生产者一个消息回执
这就像寄快递,寄件人将包裹交给小哥,小哥在受到快递后,交完钱,要求没问题小哥就会告诉你:可以了,你可以走了
回执的消息有以下两种:
当消息成功的推送至MQ,但由于交换机(Exchange)配置错误,导致消息路由异常,此时也会返回ACK,但会带上路由异常的原因
这通常都是由开发/运维人员配置错误导致,不算常见
结合上没有回执的情况,也就是有三种情况,要是没受到如何回执,生产者通常会重新发送
消费者的确认机制,其实可以细分为以下两个: Publisher Confirm 和 Publisher Returns
Publisher Confirm
Publisher Confirm可以确保消息被RabbitMQ正确接收并存储在队列中
开启的方法也比较简单,我们先修改配置文件:
spring:
rabbitmq:
#对影响性能大,非必要勿开
publisher-confirm-type: correlated #none 关闭生产者确认;simple 同步阻塞方式;correlated 异步回调方式
Publisher Confirm有三种模式:
none: 关闭Publisher Confirm
simple: 同步确认模式,生产者在发送消息后等待RabbitMQ的确认消息。只有在收到确认后,生产者才会继续执行后续操作
correlated(推荐): 异步确认模式,生产者在发送消息后立即继续执行其他任务,通过回调函数处理MQ的回执
然后我们需要在推送消息时,单独指定回调
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.UUID;
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
private CorrelationData getDefaultCorrelationData() {
//创建CorrelationData
CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
//给Future添加Callback
//我这里是两个函数,其实这里可以还可以传SuccessCallback,FailureCallback
cd.getFuture().addCallback(
//Success
result -> {
if (result.isAck()) {
log.debug("消息发送成功,收到ACK, Reason:{}", result.getReason());
} else {
log.error("消息发送失败,收到NACK, Reason:{}", result.getReason());
//实际业务可能要在这重发消息
}
},
//Failure,这里和MQ无关,是Spring在内部调用时出错,通常不会触发
ex -> log.error("消息回调失败", ex)
);
return cd;
}
@Test
public void testConfirmCallback() throws InterruptedException {
CorrelationData cd = getDefaultCorrelationData();
String exchange = "test.direct1"; //这里我故意填错
String msg = "Hello_Canyue";
rabbitTemplate.convertAndSend(exchange, "red", msg, cd);
Thread.sleep(2000); //等回调处理
}
}
这里我故意填错交换机名称,看看日志输出:
07-30 15:18:04:272 ERROR 4722 --- [ 127.0.0.1:5673] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'test.direct1' in vhost '/', class-id=60, method-id=40)
07-30 15:18:04:276 ERROR 4722 --- [nectionFactory2] com.itheima.publisher.SpringAmqpTest : 消息发送失败,收到NACK, Reason:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'test.direct1' in vhost '/', class-id=60, method-id=40)
Reason中会带上错误信息
我们改成正确的看看
07-30 15:20:46:997 DEBUG 4836 --- [ 127.0.0.1:5673] com.itheima.publisher.SpringAmqpTest : 消息发送成功,收到ACK, Reason:null
Publisher Returns
Publisher Returns可以确保消息能够被正确路由到目标队列
通常路由失败是由开发/运维人员的错误操作引起,通常不开启
开启的方法也很简单,先修改配置文件:
spring:
rabbitmq:
#对影响性能大,非必要勿开
publisher-returns: true #开启路由失败返回
每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置,我们编写个配置项类:
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class MqConfirmConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RabbitTemplate template = applicationContext.getBean(RabbitTemplate.class);
//配置回调
template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
log.debug("收到return callback: exchange:{},key:{},msg:{},replyCode:{},replyText:{}",
returned.getExchange(), returned.getRoutingKey(),
returned.getMessage(), returned.getReplyCode(),returned.getReplyText());
}
});
}
}
配置一个交换机test.direct,与队列的绑定如下
写一个测试方法,我们这次故意设置一个不存在的Routing Key,使其发生路由错误
import ...
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testRouterError(){
String exchange = "test.direct";
String msg = "Hello_Canyue";
rabbitTemplate.convertAndSend(exchange,"green",msg);
}
}
看看日志输入:
07-30 15:06:50:767 DEBUG 4369 --- [nectionFactory1] c.i.publisher.config.MqConfirmConfig : 收到return callback: exchange:test.direct,key:green,msg:(Body:'"Hello_Canyue"' MessageProperties [headers={__TypeId__=java.lang.String}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),replyCode:312,replyText:NO_ROUTE
生产者事务机制
在RabbitMQ中,生产者事务机制是一种确保消息原子性发送的方法。
也就意味着,如果事务中有任何一条消息投递失败,那么事务中的所有消息都会被回滚,即都不会被RabbitMQ接收。
我们来做个实验(未成功):
先再创建一个配置文件,开启事务管理
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqTransactionConfig {
@Bean("rabbitTransactionManager")
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
@Bean
public RabbitTemplate transactionRabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
}
编写一个消费者,监听队列的消息
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
import java.util.Map;
@Slf4j
@Component
public class MqListeners {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "direct.queue1",declare = "true"),
exchange = @Exchange(value = "test.direct",type = ExchangeTypes.DIRECT),
key = {"red","blue"}
))
public void directReceive1(String msg) throws InterruptedException {
System.out.printf("消费者1,监听到direct.queue1的消息:[%s]\n",msg);
}
}
我们先测试,事务中有投递失败的情况
@Test
@Transactional
public void testTransactional(){
String exchange = "test.direct";
// 将信道设置为事务模式
rabbitTemplate.setChannelTransacted(true);
//我能被成功路由
rabbitTemplate.convertAndSend(exchange,"blue","Hello_blue");
//我不能能被成功路由
// rabbitTemplate.convertAndSend(exchange,"greed","Hello_green");
}
本试验失败,待研究
不知道为什么,我的事务即使没在都能推送的情况下一直在回滚
保障消息在MQ中的可靠性
RabbitMQ为确保性能,默认将收到的消息存放在内存中
若此时MQ宕机,就有可导致内存中的消息丢失
且当内存满时,若消费者无法及时消耗队列中的消息,可能会导致消息的积压,引发MQ堵塞
PageOut
这里先介绍一个概念
当内存占满后,MQ会执行PageOut操作,将部分消息持久至磁盘,以腾出跟多内存接受新的消息
但要注意,在执行PageOut操作的过程中,MQ处于阻塞状态,只有等到操作结束,MQ才能接收新消息
持久化对象
持久化对象主要是为了防止服务器重启/服务宕机等等情况,导致保存在内存中的消息丢失
数据持久化主要有三个实现
交换机与队列的持久化
交换机持久化和消息队列的持久化,其实可以把他理解为是否为一个临时的交换机/队列
持久化(Durable)的交换机/队列,在MQ重启后依旧存在,反之临时(Transient)的就将不存在
我们可以通过一下方式创建持久化的交换机/队列
通过控制台
控制台的方式比较简单,直接在创建时Durability选项选择为Durable就行,我的版本默认就是这个
通过代码声明
在实际开发中,可能更多是开发者在代码中声明,然后自动创建需要的交换机/队列
我们可以通过编写一个配置类实现,如下:
//import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* <h1>在Java代码中声明交换机与队列</h1>
* 通常来说,在消费者端声明
*/
@Configuration
public class FanoutConfiguration {
@Bean
public FanoutExchange fanoutExchange() {
//也可以通过ExchangeBuilder
//return ExchangeBuilder.fanoutExchange("test.fanout2").build();
//params:name,durable,autoDelete(不再使用时是否自动删除)
//return new FanoutExchange("test.fanout2",false,true);
return new FanoutExchange("test.fanout2",true,false);
}
@Bean
public Queue fanoutQueue3() {
//durable[默认] 持久的:会被持久化到磁盘,
//return QueueBuilder.durable("fanout.queue3").build();
//params:name,durable,autoDelete(不再使用时是否自动删除)
return new Queue("fanout.queue3");
}
@Bean
public Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange) {
return BindingBuilder
.bind(fanoutQueue3)
.to(fanoutExchange);
}
}
也可以在监听器中用注解声明,如下:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "direct.queue2",declare = "true"),
exchange = @Exchange(value = "test.direct",type = ExchangeTypes.DIRECT),
key = {"red","yellow"}
))
public void directReceive2(String msg) throws InterruptedException {
System.out.printf("消费者2,监听到direct.queue2的消息:[%s]\n",msg);
}
这里要注意! 交换机/消息队列的持久化与消息的持久化是不同的概念
如果一个队列是持久化的,但消息不是持久化的,那么MQ重启后,队列虽仍然存在,但队列中的非持久化消息会丢失
如果一个交换机是持久化的,但与它绑定的队列不是持久化的,那么MQ重启后,交换机仍然存在,但与之绑定的非持久化队列会丢失
如果一个队列不是持久化的,但存在一条持久化的消息,在MQ重启后,队列将不存在,即使重新创建同名队列也找不到那条持久化的消息(至少我的实验结果是这样的)
也就是说,只有队列和消息都是持久化的,消息才会在重启后被保留
持久化的交换机/队列,在控制台中或有字母D标识,如下图
我标记的是一个非持久化的队列
消息的持久化
我们可以设置消息的Delivery mode
属性,指定消息是否是持久化的
默认Delivery mode为1,及存储在内存中,MQ重启可能会导致消息丢失,在内存不足时可能会因PageOut操作持久化到硬盘
当将Delivery mode设置为2时,也就是Persistent
,代表消息是持久化的,消息在到达MQ后就立即保存一份至硬盘
我们可以在控制台上看到消息的Delivery mode属性
不过在我们实际的开发中,更多实在生产者端用代码声明,如下:
@Test
public void testPersistentMessage() {
Message message = MessageBuilder
.withBody("\"Hello\"".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久化
// .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) //非持久化
.build();
rabbitTemplate.convertAndSend("test.queue", message);
}
消息的持久化会增加IO,或多或少会降低MQ性能
但由于消息在进入MQ时就事先已经下落至硬盘,所以在触发PageOut时只需清除部分内存中的消息,可以降低PageOut的堵塞时间
LazyQueue
RabbitMQ 3.6及之后的版本中,引入了LazyQueue(惰性队列)
惰性队列具有以下特点
接受到的消息将直接存放至硬盘
RabbitMQ会在内存中缓存最新的几条消息
当消费者在消费时,若缓存未命中,RabbitMQ会从硬盘边读取
惰性队列可以有效降低队列的内存占用,适合于消息量不大,但消息到达频率很频繁的场景
Set the queue into lazy mode, keeping as many messages as possible on disk to reduce RAM usage; if not set, the queue will keep an in-memory cache to deliver messages as fast as possible.
要创建一个惰性队列,我们要在队列的参数(Arguments)中添加x-queue-mode
参数并将值设置为lazy
指定
若未指定,默认不启用惰性队列(好像3.12后默认启用)
我们可以在控制台添加:
当然,实际中更多还是在消费者端使用代码声明,以注解形式举例:
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
import java.util.Map;
@Slf4j
@Component
public class MqListeners {
@RabbitListener(queuesToDeclare = @Queue(
name = "test.lazy",
declare = "true", //队列是否持久化
arguments = @Argument(name = "x-queue-mode",value = "lazy") //惰性队列
))
public void lazyQueueReceive(String msg) {
log.info("接收到 test.lazy的消息:{}",msg);
}
}
性能差异
LazyQueue的策略是尽可能将消息存放至硬盘,这可以尽可能避免应内存不足触发PageOut操作导致堵塞
这里我们对比大量写入的情况,先是不开启LazyQueue
再看看开启了LazyQueue
可以发现,LazyQueue虽然写入性能上较慢,但并没有触发PageOut
但由于大量的消息存放在硬盘,LazyQueue可能不适合大量频繁读取的场景
我们做个实验,使用同一个消费者同时消费50万条的消息,对比下消费者的耗时
生产50万条消息
import ...
@SpringBootTest
public class SpringAmqpTest {
@Test
public void lazyTimingExperiment() {
for (int i = 1; i < 500000; i++) {
rabbitTemplate.convertAndSend("test.lazy", String.valueOf(i));
}
rabbitTemplate.convertAndSend("test.lazy", "END"); //结束标识
}
}
消费者:
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Map;
@Slf4j
@Component
public class MqListeners {
@Value("${canyue.test.startTime}")
private Long startTime;
@RabbitListener(queuesToDeclare = @Queue("test.lazy"))
public void lazyTimingExperiment(String msg){
if(startTime == -1){
startTime = System.currentTimeMillis();
}
if(msg.equals("END")){
long endTime = System.currentTimeMillis();
System.err.printf("消费者耗时:%d秒\n",(endTime-startTime) / 1000);
}
}
}
在不启用LazyQueue的情况下:
消费者耗时:12秒
在开启LazyQueue的情况下:
消费者耗时:14秒
这里有2秒的差距,其实在控制台中会更明显
当然,还有更多方式去保障消息在MQ中的可靠性
例如可以考虑极限队列,甚至可以在硬件层面入手,具体要根据业务需求调整
保障消费者的可靠性
我们还差最后一个环节,如何保障消费者相关的可靠性
先来讨论如何确保消费者成功接受到消息,且保障在处理成功前,消息不丢失
消费者确认机制
RabbitMQ提供了和生产者类似的消费者确认机制
消费者确认机制就像取快递,包裹里面的内容比较贵重,收件人(消费者)在拿到手上后,在快递员面前验货
小哥要在收件人检查没问题后,签字确认
而确认机制就要求,MQ向消费者推送消息,当消费者处理结束后,会向MQ发送一个回执告诉MQ消息处理的状态,回执有一下三种:
更多细节,官方文档讲的很详细:
其实在Spring AMQP中,已经替我们实现了消费者回执相关功能
我们可以先修改消费者的配置文件
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 消费者确认机制,none:不启用,manual:手动发送ACK/REJECT,auto:由Spring自动处理
acknowledge
有三种模式:
none:不启用,不等消息处理,都直接返回ACK,这么做通常是为了提升性能
manual:手动发送,自行调用相关API,发送ACK/Nack/REJECT,可能存在业务入侵
auto:由Spring AMQP自动发送
业务正常:返回ACK
业务异常,返回NACK
消息处理/校验异常:返回REJECT
同样,我们可以在队列控制台中,看到这两个指标
Ready:待消费消息计数
Unacked:待消费者确认消息计数
自动处理(推荐)
我们来做个实验,手动写一个存在业务异常的消费者,返回NACK
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Map;
@Slf4j
@Component
public class MqListeners {
@RabbitListener(queuesToDeclare = @Queue("test.queue"))
public void testError(String msg){
log.info("消费者接收到消息:{}",msg);
//这书一个没被处理的异常
throw new RuntimeException("哎呀,出错了");
}
}
队列中有两条消息,[“1”,“2”],我们看控制台输出
....
08-03 09:01:07:278 INFO 5171 --- [ntContainer#6-1] c.i.consumer.listeners.MqListeners : 消费者接收到消息:1
....
08-03 09:01:07:280 INFO 5171 --- [ntContainer#6-1] c.i.consumer.listeners.MqListeners : 消费者接收到消息:1
....
08-03 09:01:07:283 INFO 5171 --- [ntContainer#6-1] c.i.consumer.listeners.MqListeners : 消费者接收到消息:1
....
可以看到,同一条消息发生了多次投递(其实这里会无限次一直投递),针对重复投递问题,结合下面消费者重试机制可以解决
手动处理
这是RabbitMQ提供的API,用于开发人员手动向RabbitMQ回执
用于传递确认的 API 方法通常在客户端库中以在通道上的操作的形式开放
手动处理需要开发者向业务逻辑中添加回执相关的代码,可能会造成业务侵入;但相对于自动处理,手动处理具有更高的灵活性
一些概念
在继续往下之前,我们需要先了解些基础概念
传递标签
在消费者向RabbitMQ注册后,会创建一个通道(Channel),MQ 会用 basic.deliver 方法向消费者推送消息
而传递标签(deliveryTag),用于唯一标识通道中的某一消息
传递标签是一个单调递增的正整数,消费者可以利用传递标签顺序处理消息,确保消息的顺序性
同时确认多条消息
为节省带宽,API允许同时确认多条消息
提供的部分方法带有multiple
参数,单设置为true
时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
发送回执
回归正题,让我们先修改配置文件:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 消费者确认机制,none:不启用,manual:手动发送ACK/REJECT,auto:由Spring自动处理
先来看看这三个方法的定义
channel.basicAck
向MQ发送确认回执
/**
* 确认收到一个或多个消息
* @param deliveryTag 传递标签
* @param 是否同时确认多条消息
*/
void basicAck(long deliveryTag, boolean multiple) throws IOException;
channel.basicNack
拒收一条或多条消息
你可以通过requeue参数指定拒收的消息是否回到队列重新投递(就像自动处理时发送的NACK)
当requeue为false时,若未配置死信交换机,消息将被丢弃
/**
* 拒收一条或多条消息
* @param deliveryTag 传递标签
* @param requeue 如果选择true,则拒收的消息将重新进入队列,false时若队列为配置死信交换机,消息将被丢弃
*/
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
channel.basicReject
拒收一条消息
你可以通过requeue参数指定拒收的消息是否回到队列
当requeue为false时(就像自动处理的REJECT),若未配置死信交换机,消息将被丢弃
/**
* 拒收一条消息
* @param deliveryTag 传递标签
* @param requeue 如果选择true,则拒收的消息将重新进入队列,false时若队列为配置死信交换机,消息将被丢弃
*/
void basicReject(long deliveryTag, boolean requeue) throws IOException;
相信看了定义应该也能看出使用方法
使用方法也很简单,我随便举个例子,具体要照业务实际开发
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Slf4j
@Component
public class MqListeners {
@RabbitListener(queuesToDeclare = @Queue("test.queue"))
public void testManual(Message message, Channel channel) throws IOException {
//拿到DeliveryTag
long tag = message.getMessageProperties().getDeliveryTag();
String msg = "";
//下面这些应该按照实际业务需求编写
try{
msg = new String(message.getBody());
}catch (Exception e){
//requeue == false 不重发,若未配置死信队列,消息将被丢弃
channel.basicReject(tag,false);
}
String[] data = new String[2];
try{
data = msg.split(":");
}catch (Exception e){
//这里虽然回执NACK,但消息也不会重发
channel.basicNack(tag,true,false);
}
log.info("{}今年{}岁了",data[0],data[1]);
//ACk
channel.basicAck(tag,true);
}
}
更多相关内容,可以看官方文档:
恢复未确认的消息
发现RabbitMQ提供的API中有个basicRecover
方法
他可以让MQ重新发送未经消费者确认的消息,也就是在Unacked状态的消息
被重发的消息将放到对尾,通常在消费者因故障后重新连接时调用
/**
* 重新发送通道中处于未确认状态的消息
* @param 如果为 true,消息将被重新排队,并可能发送给不同的消费者。 如果为 false,消息将重新发送给相同的消费者。
*/
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
以上手动处理的内容,若在不使用Spring框架的项目中使用,可以参考官方的案例
https://rabbitmq.org.cn/docs/confirms#consumer-acks-api-elements
消费者重试机制
在之前消费者确认机制中,我们遇到了消息不断Requeue且又不断投递的情况
为解决MQ重复投递问题,我们可以再引入消费者的重试机制,限制同一条消息的最大重试次数
引入后,消息会在消费者本地进行重试(此时消息处于Unacked状态),当消息超出重试次数后,将其丢弃/转发到死信交换机
先修改配置文件:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 消费者确认机制,none:不启用,manual:手动发送ACK/REJECT,auto:由Spring自动处理
retry:
enabled: true #开启确认机制
initial-interval: 1000ms #失败后,重新等待的初始时间
multiplier: 1 #重试的等待时长倍数(也就是下一次重试的等待时长,是上一次重试的几倍)
max-attempts: 3 #最大尝试参数
stateless: true # 重试是是否不带上状态, true 不带状态; false 带状态 (用于业务中包含事务,等需包含上下文的情况)
我们在用那个存在异常业务的消费者看看
@RabbitListener(queuesToDeclare = @Queue("test.queue"))
public void testError(String msg){
log.info("消费者接收到消息:{}",msg);
//这是一个没被处理的异常
throw new RuntimeException("哎呀,出错了");
}
再看看日志:
08-04 14:58:19:999 INFO 12076 --- [ main] c.itheima.consumer.ConsumerApplication : Started ConsumerApplication in 13.868 seconds (JVM running for 14.496)
08-04 14:58:20:002 INFO 12076 --- [ntContainer#8-1] c.i.consumer.listeners.MqListeners : 消费者接收到消息:1
08-04 14:58:21:005 INFO 12076 --- [ntContainer#8-1] c.i.consumer.listeners.MqListeners : 消费者接收到消息:1
08-04 14:58:22:006 INFO 12076 --- [ntContainer#8-1] c.i.consumer.listeners.MqListeners : 消费者接收到消息:1
08-04 14:58:22:012 WARN 12076 --- [ntContainer#8-1] o.s.a.r.r.RejectAndDontRequeueRecoverer : Retries exhausted for message (Body:'[B@60313bdd(byte[1])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=true, receivedExchange=, receivedRoutingKey=test.queue, deliveryTag=1, consumerTag=amq.ctag-MBpImdk1WYtudhowxjUtCA, consumerQueue=test.queue])
08-04 14:58:22:013 WARN 12076 --- [ntContainer#8-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
......
08-04 14:58:22:018 INFO 12076 --- [ntContainer#8-1] c.i.consumer.listeners.MqListeners : 消费者接收到消息:2
08-04 14:58:23:019 INFO 12076 --- [ntContainer#8-1] c.i.consumer.listeners.MqListeners : 消费者接收到消息:2
08-04 14:58:24:021 INFO 12076 --- [ntContainer#8-1] c.i.consumer.listeners.MqListeners : 消费者接收到消息:2
08-04 14:58:24:023 WARN 12076 --- [ntContainer#8-1] o.s.a.r.r.RejectAndDontRequeueRecoverer : Retries exhausted for message (Body:'[B@56697fa6(byte[1])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=true, receivedExchange=, receivedRoutingKey=test.queue, deliveryTag=2, consumerTag=amq.ctag-MBpImdk1WYtudhowxjUtCA, consumerQueue=test.queue])
08-04 14:58:24:026 WARN 12076 --- [ntContainer#8-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
诶,很明显,现在消息重试3次后,就不会再重新投递,而是被直接删除
但,直接删除过于卤莽,我们可不可以将出现问题的消息连同一些上下文消息收集起来,方便后续定位原因
这里,就要用到一个概念。叫死信
死信的处理
先来了解什么是死信
死信(Dead Letter): 是指那些由于某种原因无法正常处理或路由到目的地的消息
按照官网文档的说法,造成一条消息变为死信的原因有以下几种可能:
消费者使用
basic.reject
或basic.nack
方法,且设置requeue
为false
,对消息进行了负确认消息过期,也就消息在队列的时间超过配置的TTL
队列空间不足,最早的消息可能会成为死信
消息超出最大重试次数
如果整个队列过期,队列中的消息将不会被死信化。
将重试失败的消息转发到死信交换机
这里我们先只讨论刚刚消费者重试机制失败的消息
在开启消费者重试机制后,若消息重试次数耗尽,就需要有MessageRecoverer接口处理
接口有三个实现:
RejectAndDontRequeueRecoverer(默认): 重试耗尽后,发送REJECT回执,并让MQ丢弃消息
ImmediatRequeueMessageRecoverer: 重试耗尽后,返回NACK,让消息重新入队,相当于只降低了MQ重发频率
RepublishMessaheRecoverer: 重试耗尽后,将消息投递至指定的交换机,通常是专门的死信交换机
我们可以使用第三种方案,消息在投递至死信交换机时,会带上原消息的内容以及一些异常消息
到时候可以使用一个专门的消费者收集这些死信,方便开发者定位问题
这么看这更像是一种兜底方案
通常我们会在消费者端通过配置类声明:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
//只有在配置文件中开启重试机制,本配置类才生效
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorConfiguration {
//死信交换机
@Bean
public DirectExchange errorExchange() {
return new DirectExchange("error.direct");
}
//死信队列
@Bean
public Queue errorQueue(){
return new Queue("error.queue");
}
//绑定下
@Bean
public Binding errorBinding(){
return BindingBuilder
.bind(errorQueue())
.to(errorExchange())
.with("error");
}
//定义RepublishMessageRecoverer
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
}
}
我们还是用之前的带异常业务的消费者,重试三次,将消息投递到死信交换机
8-05 13:53:50:705 INFO 13650 --- [ntContainer#8-1] c.i.consumer.listeners.MqListeners : 消费者接收到消息:我是一条死信
08-05 13:53:51:708 INFO 13650 --- [ntContainer#8-1] c.i.consumer.listeners.MqListeners : 消费者接收到消息:我是一条死信
08-05 13:53:52:709 INFO 13650 --- [ntContainer#8-1] c.i.consumer.listeners.MqListeners : 消费者接收到消息:我是一条死信
08-05 13:53:52:719 WARN 13650 --- [ntContainer#8-1] o.s.a.r.retry.RepublishMessageRecoverer : Republishing failed message to exchange 'error.direct' with routing key error
可以看第三条日志,现在提示Republishing failed message to exchange,也就是这次将失败的消息重新投递到了指定的交换机
我们去控制台看看死信队列中的消息
不太好截,但还好,看个大概就行
可以发现,死信中而外包含了部分错误信息,这可以方便开发人员定位问题
更多细节可以看篇博文,写的很详细:
那有没有更通用的方法呢,往下看:
为队列绑定死信交换机
我们可以通过队列的dead-letter-exchange属性指定一个交换机,这个交换机也就被绑定为这个队列的死信交换机
当队列中出现死信时,会自动转发到绑定的那个死信交换机
我们先编写一个配置类,定义本次实验需要用到的交换机和队列
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TestDeadLetterConfiguration {
//先来定义正常业务的交换机和队列
@Bean
public DirectExchange testDirect2Exchange() {
return new DirectExchange("test.direct2", true, false);
}
@Bean
public Queue testQueue2(){
return QueueBuilder
.durable("test.queue2")
//这里,其实就相当于添加dead-letter-exchange属性
.deadLetterExchange("dlx.direct")
//相当于dead-letter-routing-key属性
.deadLetterRoutingKey("deadLetter")
.build();
}
@Bean
public Binding testBinding2(Queue testQueue2, DirectExchange testDirect2Exchange){
return BindingBuilder
.bind(testQueue2)
.to(testDirect2Exchange)
.with("test");
}
//再来定义死信的业务
@Bean
public DirectExchange dlxDirectExchange() {
return new DirectExchange("dlx.direct");
}
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable("dlx.queue").build();
}
@Bean
public Binding dlxBinding() {
return BindingBuilder
.bind(dlxQueue())
.to(dlxDirectExchange())
.with("deadLetter");
}
}
队列test.direct2
绑定队列test.queue2
,但这次,我不打算消费这条队列
队列test.queue2
设置dead-letter-exchange
属性,指定死信交换机为dlx.direct
私信交换机dlx.direct
绑定死信队列dlx.queue
,到时候编写一个消费者监听
我们先来控制台看看,刚才创建的队列有没有添加相应的属性
OK,编写消费者
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Map;
@Slf4j
@Component
public class MqListeners {
@RabbitListener(queuesToDeclare = @Queue("dlx.queue"))
public void dlxExperiment(String msg){
log.warn("消费者接收到死信:{}",msg);
}
}
这次,打算在消息中设置TTL,等消息超时变为死信
@Test
public void testSendTTLMessage(){
Message message = MessageBuilder
.withBody("Hello".getBytes(StandardCharsets.UTF_8))
.setExpiration("1000") //单位毫秒
.build();
rabbitTemplate.convertAndSend("test.direct2","test",message);
}
看看日志:
08-06 15:47:09:997 WARN 18843 --- [ntContainer#8-1] c.i.consumer.listeners.MqListeners : 消费者接收到死信:Hello
OK,很满意
其实相同的操作,也可以用在实现MQ的延迟消息
参考资料:
https://www.rabbitmq.com/docs https://xie.infoq.cn/article/12c4bd997a7bd20985f2ddc00 https://www.bilibili.com/video/BV1mN4y1Z7t9/ https://springdoc.cn/spring-amqp https://blog.csdn.net/fckbb/article/details/139263348
- 0
-
赞助
微信支付宝 -
分享