> 死信队列可以实现消息在未被正常消费的场景下,对这些消息进行其他处理,保证消息不会被丢弃。 在说死信队列之前,我们先介绍下为什么需要用死信队列。 如果想直接了解死信对接,直接跳入下文的"死信队列"部分即可。 ### ack机制和requeue-rejected属性 [RabbitMQ在SpringBoot中集成](https://www.cnblogs.com/bigdataZJ/p/springboot-rabbitmq.html) 在项目[springboot-demo](https://github.com/DMinerJackie/rome/tree/master/springboot/springboot-demo)我们看到application.yaml文件部分配置内容如下 ```java ... listener: type: simple simple: acknowledge-mode: auto concurrency: 5 default-requeue-rejected: true max-concurrency: 100 ... ``` 其中 **acknowledge-mode** 该配置项是用来表示消息确认方式,其有三种配置方式,分别是none、manual和auto。 none意味着没有任何的应答会被发送。 manual意味着监听者必须通过调用Channel.basicAck()来告知所有的消息。 auto意味着容器会自动应答,除非MessageListener抛出异常,这是默认配置方式。 **default-requeue-rejected** 该配置项是决定由于监听器抛出异常而**拒绝的消息**是否被重新放回队列。默认值为true。 我一开始对于这个属性有个误解,我以为rejected是表示拒绝,所以将`requeue-rejected`连起来是拒绝重新放回队列,后来查了资料明白这个属性的功能才想起来rejected是个形容词,其表示的应该是**被拒绝的消息** 所以如果该属性配置为true表示会重新放回队列,如果配置为false表示不会放回队列。 下面我们看看acknowledge-mode参数和default-requeue-rejected参数使用不同的组合方式,RabbitMQ是如何处理消息的。 代码依然使用springboot-demo中的RabbitApplicationTests发送消息,使用Receiver类监听demo-queue队列的消息。 对于Receiver类添加了一行代码,该代码模拟抛出异常 ```java @Component public class Receiver { @RabbitListener(queues = "demo_queue") public void created(String message) { System.out.println("orignal message: " + message); int i = 1/0; } } ``` **acknowledge-mode=none, default-requeue-rejected=false**  该配置不会确认消息是否正常消费,所以在控制台没有抛出任何异常。通过在RabbitMQ管理页面也没有看到重新放回队列的消息 **acknowledge-mode=none, default-requeue-rejected=true**  同样该配置不会确认消息是否正常消费,所以在控制台没有抛出任何异常。而且即使default-requeue-rejected配置为true因为没有确认所以也没有看到重新放回队列的消息 **acknowledge-mode=manual, default-requeue-rejected=false**  该配置需要手动确认消息是否正常消费,但是代码中并没有手动确认,个人理解是因为没有收到ack,所以消息又回到了队列中。 **acknowledge-mode=manual, default-requeue-rejected=true**  该配置需要手动确认消息是否正常消费,但是代码中并没有手动确认,所以消息被重新放入到队列中了,并且在控制台发现还抛出了异常(**这块不是很清楚,default-requeue-rejected设置true和false带来的不同效果,有了解的麻烦下方留言指教**)。 **acknowledge-mode=auto, default-requeue-rejected=false**  该配置采用自动确认,从结果来看,是自动确认了。 从控制台打印的结果可以看出Receiver方法执行了3次,分别是前面两条放回队列的消息以及这次发送的消息,所以3条消息都消费了。 同时因为default-requeue-rejected设置为false,所以即使消费抛出异常,也没有将消息放回队列。 **acknowledge-mode=auto, default-requeue-rejected=true**  该配置同样采用自动确认,从结果看出,没有抛出异常(**这块也不是很理解**),且因为default-requeue-rejected设置为true,所以消息重新回到队列。 综上罗列这么多情况只为说明有些情况下,如果消息消费出错,因为配置问题导致消息丢失了。这在很多情况下是要命的,比如用户支付的订单号,如果因为抛异常等原因直接丢失是很要命的。 所以,我们需要有一个确保机制,能够保证即使失败的消息也能保存下来,这时候死信队列就排上用场了。 ### 死信队列 死信队列的整个设计思路是这样的 > 生产者 --> 消息 --> 交换机 --> 队列 --> 变成死信 --> DLX交换机 -->队列 --> 消费者 下面我们通过网上的一个简单的死信队列的实现看看如何使用死信队列。 ```java @Bean("deadLetterExchange") public Exchange deadLetterExchange() { return ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build(); } @Bean("deadLetterQueue") public Queue deadLetterQueue() { Map args = new HashMap<>(2); // x-dead-letter-exchange 声明 死信交换机 args.put("x-dead-letter-exchange", "DL_EXCHANGE"); // x-dead-letter-routing-key 声明 死信路由键 args.put("x-dead-letter-routing-key", "KEY_R"); return QueueBuilder.durable("DL_QUEUE").withArguments(args).build(); } @Bean("redirectQueue") public Queue redirectQueue() { return QueueBuilder.durable("REDIRECT_QUEUE").build(); } /** * 死信路由通过 DL_KEY 绑定键绑定到死信队列上. * * @return the binding */ @Bean public Binding deadLetterBinding() { return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null); } /** * 死信路由通过 KEY_R 绑定键绑定到死信队列上. * * @return the binding */ @Bean public Binding redirectBinding() { return new Binding("REDIRECT_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "KEY_R", null); } ``` **注意** * 声明了一个direct模式的exchange。 * 声明了一个死信队列deadLetterQueue,该队列配置了一些属性`x-dead-letter-exchange`表明死信交换机,`x-dead-letter-routing-key`表明死信路由键,因为是direct模式,所以需要设置这个路由键。 * 声明了一个替补队列redirectQueue,变成死信的消息最终就是存放在这个队列的。 * 声明绑定关系,分别是死信队列以及替补队列和交换机的绑定。 那么如何模拟生成一个死信消息呢,可以在发送到DL\_QUEUE的消息在10秒后失效,然后转发到替补队列中,代码实现如下 ```java public void sendMsg(String content) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); MessagePostProcessor messagePostProcessor = message -> { MessageProperties messageProperties = message.getMessageProperties(); // 设置编码 messageProperties.setContentEncoding("utf-8"); // 设置过期时间10*1000毫秒 messageProperties.setExpiration("5000"); return message; }; rabbitTemplate.convertAndSend("DL_EXCHANGE", "DL_KEY", content, messagePostProcessor); } ``` 执行结果如下  消息首先进入DL\_QUEUE,5秒后失效,被转发到REDIRECT\_QUEUE中。 ### \# 概念: * **消息会变成死信消息的场景:** 1. 消息被`(basic.reject() or basic.nack()) and requeue = false`,即消息被消费者拒绝签收,并且重新入队为false。 1.1 有一种场景需要注意下:消费者设置了自动ACK,当重复投递次数达到了设置的最大retry次数之后,消息也会投递到死信队列,但是内部的原理还是调用了`nack`/`reject`。 2. 消息过期,过了TTL存活时间。 3. 队列设置了`x-max-length`最大消息数量且当前队列中的消息已经达到了这个数量,再次投递,消息将被挤掉,被挤掉的是最靠近被消费那一端的消息。 * **代码编写流程是:** 1. 有一个(n个)正常业务的Exchange,比如为`user-exchange`。 2. 有一个(n个)正常业务的Queue,比如为`user-queue`。(因为该队列需要绑定死信交换机,所以需要加俩参数:死信交换机:`x-dead-letter-exchange`,死信消息路由键:`x-dead-letter-routing-key`) 3. 进行正常业务的交换机和队列绑定。 4. 定义一个死信交换机,比如为`common-dead-letter-exchange`。 5. 将正常业务的队列绑定到死信交换机(队列设置了`x-dead-letter-exchange`即会自动绑定)。 6. 定义死信队列`user-dead-letter-queue`用于接收死信消息,绑定死信交换机。 * **业务流程是:** 1. 正常业务消息被投递到正常业务的Exchange,该Exchange根据路由键将消息路由到绑定的正常队列。 2. 正常业务队列中的消息变成了死信消息之后,会被自动投递到该队列绑定的死信交换机上(并带上配置的路由键,如果没有指定死信消息的路由键,则默认继承该消息在正常业务时设定的路由键)。 3. 死信交换机收到消息后,将消息根据路由规则路由到指定的死信队列。 4. 消息到达死信队列后,可监听该死信队列,处理死信消息。 * `死信交换机`、`死信队列`也是普通的交换机和队列,只不过是我们人为的将某个交换机和队列来处理死信消息。 * 流程图  图片.png ### \# 代码实现 1. 配置 ```java spring: application: name: learn-rabbitmq rabbitmq: host: localhost port: 5672 username: futao password: 123456789 virtual-host: deadletter-vh connection-timeout: 15000 # 发送确认 publisher-confirms: true # 路由失败回调 publisher-returns: true template: # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃 mandatory: true listener: simple: # 每次从RabbitMQ获取的消息数量 prefetch: 1 default-requeue-rejected: false # 每个队列启动的消费者数量 concurrency: 1 # 每个队列最大的消费者数量 max-concurrency: 1 # 签收模式为手动签收-那么需要在代码中手动ACK acknowledge-mode: manual app: rabbitmq: # 队列定义 queue: # 正常业务队列 user: user-queue # 死信队列 user-dead-letter: user-dead-letter-queue # 交换机定义 exchange: # 正常业务交换机 user: user-exchange # 死信交换机 common-dead-letter: common-dead-letter-exchange ``` 1. 队列、交换机定义与绑定。 ```java /** * 队列与交换机定义与绑定 * * @author futao * @date 2020/4/7. */ @Configuration public class Declare { /** * 用户队列 * * @param userQueueName 用户队列名 * @return */ @Bean public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName, @Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) { return QueueBuilder .durable(userQueueName) //声明该队列的死信消息发送到的 交换机 (队列添加了这个参数之后会自动与该交换机绑定,并设置路由键,不需要开发者手动设置) .withArgument("x-dead-letter-exchange", commonDeadLetterExchange) //声明该队列死信消息在交换机的 路由键 .withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key") .build(); } /** * 用户交换机 * * @param userExchangeName 用户交换机名 * @return */ @Bean public Exchange userExchange(@Value("${app.rabbitmq.exchange.user}") String userExchangeName) { return ExchangeBuilder .topicExchange(userExchangeName) .durable(true) .build(); } /** * 用户队列与交换机绑定 * * @param userQueue 用户队列名 * @param userExchange 用户交换机名 * @return */ @Bean public Binding userBinding(Queue userQueue, Exchange userExchange) { return BindingBuilder .bind(userQueue) .to(userExchange) .with("user.*") .noargs(); } /** * 死信交换机 * * @param commonDeadLetterExchange 通用死信交换机名 * @return */ @Bean public Exchange commonDeadLetterExchange(@Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) { return ExchangeBuilder .topicExchange(commonDeadLetterExchange) .durable(true) .build(); } /** * 用户队列的死信消息 路由的队列 * 用户队列user-queue的死信投递到死信交换机`common-dead-letter-exchange`后再投递到该队列 * 用这个队列来接收user-queue的死信消息 * * @return */ @Bean public Queue userDeadLetterQueue(@Value("${app.rabbitmq.queue.user-dead-letter}") String userDeadLetterQueue) { return QueueBuilder .durable(userDeadLetterQueue) .build(); } /** * 死信队列绑定死信交换机 * * @param userDeadLetterQueue user-queue对应的死信队列 * @param commonDeadLetterExchange 通用死信交换机 * @return */ @Bean public Binding userDeadLetterBinding(Queue userDeadLetterQueue, Exchange commonDeadLetterExchange) { return BindingBuilder .bind(userDeadLetterQueue) .to(commonDeadLetterExchange) .with("user-dead-letter-routing-key") .noargs(); } } ``` * 定义好之后启动程序,springboot会读取Spring容器中类型为Queue和Exchange的bean进行队列和交换机的初始化与绑定。当然也可以自己在RabbitMQ的管理后台进行手动创建与绑定。 * 查看管理后台  交换机  队列  队列与死信交换机 ### \# 测试 * 消息生产者 ```java /** * @author futao * @date 2020/4/7. */ @Component public class DeadLetterSender { @Autowired private RabbitTemplate rabbitTemplate; @Value("${app.rabbitmq.exchange.user}") private String userExchange; public void send() { User user = User.builder() .userName("天文") .address("浙江杭州") .birthday(LocalDate.now(ZoneOffset.ofHours(8))) .build(); rabbitTemplate.convertAndSend(userExchange, "user.abc", user); } } ``` **1\. 场景1.1** > 消息被(basic.reject() or basic.nack()) and requeue = false,即消息被消费者拒绝或者nack,并且重新入队为false。 _nack()与reject()的区别是:reject()不支持批量拒绝,而nack()可以._ * 消费者代码 ```java /** * @author futao * @date 2020/4/9. */ @Slf4j @Component public class Consumer { /** * 正常用户队列消息监听消费者 * * @param user * @param message * @param channel */ @RabbitListener(queues = "${app.rabbitmq.queue.user}") public void userConsumer(User user, Message message, Channel channel) { log.info("正常用户业务监听:接收到消息:[{}]", JSON.toJSONString(user)); try { //参数为:消息的DeliveryTag,是否批量拒绝,是否重新入队 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); log.info("拒绝签收...消息的路由键为:[{}]", message.getMessageProperties().getReceivedRoutingKey()); } catch (IOException e) { log.error("消息拒绝签收失败", e); } } /** * @param user * @param message * @param channel */ @RabbitListener(queues = "${app.rabbitmq.queue.user-dead-letter}") public void userDeadLetterConsumer(User user, Message message, Channel channel) { log.info("接收到死信消息:[{}]", JSON.toJSONString(user)); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("死信队列签收消息....消息路由键为:[{}]", message.getMessageProperties().getReceivedRoutingKey()); } catch (IOException e) { log.error("死信队列消息签收失败", e); } } } ``` * 可以看到,正常消息被NACK之后最后到了死信队列,且路由键发生了变化。  死信消息 **1\. 场景1.2** > 消费者设置了自动签收,当重复投递次数达到了设置的最大retry次数之后,消息也会投递到死信队列,但是内部的原理还是调用了`nack`/`reject`。 * application.yml中需要更改一些配置 ```java spring: application: name: learn-rabbitmq rabbitmq: listener: simple: # 每次从RabbitMQ获取的消息数量 prefetch: 1 default-requeue-rejected: false # 每个队列启动的消费者数量 concurrency: 1 # 每个队列最大的消费者数量 max-concurrency: 1 # 自动签收 acknowledge-mode: auto retry: enabled: true # 第一次尝试时间间隔 initial-interval: 10S # 两次尝试之间的最长持续时间。 max-interval: 10S # 最大重试次数(=第一次正常投递1+重试次数4) max-attempts: 5 # 上一次重试时间的乘数 multiplier: 1.0 ``` * 消费者代码 ```java /** * @author futao * @date 2020/4/9. */ @Slf4j @Configuration public class AutoAckConsumer { /** * 正常用户队列消息监听消费者 * * @param user */ @RabbitListener(queues = "${app.rabbitmq.queue.user}") public void userConsumer(User user) { log.info("正常用户业务监听:接收到消息:[{}]", JSON.toJSONString(user)); throw new RuntimeException("模拟发生异常"); } /** * @param user */ @RabbitListener(queues = "${app.rabbitmq.queue.user-dead-letter}") public void userDeadLetterConsumer(User user) { log.info("接收到死信消息并自动签收:[{}]", JSON.toJSONString(user)); } } ``` * 测试结果:  image.png  image.png * 从测试结果可以看出,消息如果未被正常消费,则进行重试,如果最终还未被正常消费,则会被投递到死信队列。 _`initial-interval`,`max-interval`这两个参数啥作用不知道,现在测试的结果是一直都会取最短的那个时间作为下次投递时间..._ **2\. 测试场景 2** > 消息过期,过了TTL存活时间。 * 需要修改队列定义,设置队列消息的过期时间`x-message-ttl`. ```java /** * 用户队列 * * @param userQueueName 用户队列名 * @return */ @Bean public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName, @Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) { return QueueBuilder .durable(userQueueName) //声明该队列的死信消息发送到的 交换机 (队列添加了这个参数之后会自动与该交换机绑定,并设置路由键,不需要开发者手动设置) .withArgument("x-dead-letter-exchange", commonDeadLetterExchange) //声明该队列死信消息在交换机的 路由键 .withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key") //该队列的消息的过期时间-超过这个时间还未被消费则路由到死信队列 .withArgument("x-message-ttl", 5000) .build(); } ``` * 把`user-queue`的消费者注释,使消息无法被消费,直到消息在队列中的时间达到设定的存活时间。  ttl * 根据日志可以看到,消息在5S后会被投递到死信队列。  image.png * **注意:可以给队列设置消息过期时间,那么所有投递到这个队列的消息都自动具有这个属性。还可以在消息投递之前,给每条消息设定指定的过期时间。(当两者都设置了,则默认取较短的值)** **下面测试给每条消息设置指定的过期时间:** * 修改消息生产者: ```java /** * @author futao * @date 2020/4/7. */ @Slf4j @Component public class DeadLetterSender { @Autowired private RabbitTemplate rabbitTemplate; @Value("${app.rabbitmq.exchange.user}") private String userExchange; public void send(String exp) { User user = User.builder() .userName("天文") .address("浙江杭州") .birthday(LocalDate.now(ZoneOffset.ofHours(8))) .build(); log.info("消息投递...指定的存活时长为:[{}]ms", exp); rabbitTemplate.convertAndSend(userExchange, "user.abc", user, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { MessageProperties messageProperties = message.getMessageProperties(); //为每条消息设定过期时间 messageProperties.setExpiration(exp); return message; } }); } } ```  image.png * 从测试结果可以看出,每条消息都在指定的时间投递到了死信队列。 **【坑】重点注意!!!:RabbitMQ对于消息过期的检测:只会检测最近将要被消费的那条消息是否到达了过期时间,不会检测非末端消息是否过期。造成的问题是:非末端消息已经过期了,但是因为末端消息还未过期,非末端消息处于阻塞状态,所以非末端消息不会被检测到已经过期。使业务产生与预期严重不一致的结果。** * 对上面的问题进行测试:(第一条消息的过期时间设置成10S,第二条消息设置成5S)  image.png * 从测试结果可以看出,id为1的消息存活时长为10S,id为2的消息存活时间为5S。但是只有当第一条消息(id=1)过期之后,id=2的消息到达队列末端,才会被检测到已经过期。 **3\. 测试场景3** > 队列设置了`x-max-length`最大消息数量且当前队列中的消息已经达到了这个数量,再次投递,消息将被挤掉,被挤掉的是最靠近被消费那一端的消息。 * 修改队列定义 ```java /** * 用户队列 * * @param userQueueName 用户队列名 * @return */ @Bean public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName, @Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) { return QueueBuilder .durable(userQueueName) //声明该队列的死信消息发送到的 交换机 (队列添加了这个参数之后会自动与该交换机绑定,并设置路由键,不需要开发者手动设置) .withArgument("x-dead-letter-exchange", commonDeadLetterExchange) //声明该队列死信消息在交换机的 路由键 .withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key") //队列最大消息数量 .withArgument("x-max-length", 2) .build(); } ```  image.png * 向队列中投递消息  image.png * 从结果可以看出,当投递第3条消息的时候,RabbitMQ会把在最靠经被消费那一端的消息移出队列,并投递到死信队列。  image.png 队列中将始终保持最多两个消息。 ### \# 其他: * Queue的可配置项可在RabbitMQ的管理后台查看:  image.png * 源码:[https://github.com/FutaoSmile/springboot-learn-integration/tree/master/springboot-learn-rabbitmq](https://links.jianshu.com/go?to=https%3A%2F%2Fgithub.com%2FFutaoSmile%2Fspringboot-learn-integration%2Ftree%2Fmaster%2Fspringboot-learn-rabbitmq) ### \# 相关: [SpringBoot RabbitMQ实现消息可靠投递](https://www.jianshu.com/p/432167bbe95f) 本文转自 [https://blog.csdn.net/u014748504/article/details/108147081](https://blog.csdn.net/u014748504/article/details/108147081),如有侵权,请联系删除。
评论