5.2 死信交换机

一、初识死信交换机

  1. 当队列中的消息满足满足下列情况之一时,可以称为死信(dead letter):

    1. 消费者使用basic.reject或basic.nack声明消费失败,而且消息的requeue参数设置为false

    2. 消息是一个过期消息,超时无人消费

    3. 要投递的队列消息堆积满了,最早的消息可能成为死信

  2. 如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)

  3. 如何给队列绑定死信交换机

    1. 给队列设置dead-letter-exchange属性,指定一个交换机

    2. 给队列上设置dead-letter-routing-key属性,设置死信交换机与死信队列的RutingKey

二、TTL

  1. TTL,也就是Time-To-Live,如果一个队列中的消息TTL结束仍未消费,则会变成死信,TTL超时分为两种情况:

    1. 消息所在的队列设置了存活时间

    2. 消息本身设置了存活时间

  2. 声明一组死信交换机和队列,基于注解方式:

        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name = "dl.queue", durable = "true"),
                exchange = @Exchange(name = "dl.direct"),
                key = "dl"
        ))
        public void listenDlQueue(String msg) {
            System.out.println("接收到了dl.queue的延迟消息");
        }

    备注:消费者中,声明死信交换机以及队列,在监听消息的方法中

  3. 给队列设施超时时间,并指定死信交换机,需要在声明队列的时候

        @Bean
        public DirectExchange ttlDirectExchange() {
            return new DirectExchange("ttl.direct");
        }
    
        @Bean
        public Queue ttlQueue() {
            return QueueBuilder
                    .durable("ttl.queue")  // 指定队列名称,并持久化
                    .ttl(10000)    // 设置队列的超时时间,10秒
                    .deadLetterExchange("dl.direct")   // 指定死信交换机
                    .deadLetterRoutingKey("dl")   // 指定死信RoutingKey
                    .build();
        }
        
        @Bean
        public Binding ttlBinding() {
            return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");
        }

    备注:声明正常队列,同时设置超时时间,并且绑定死信交换机

  4. 发送消息:

    发送消息从发送到ttl交换机以及队列中即可

  5. 发送消息时,给消息本身设置超时时间

            Message message = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8))  // msg是消息内容
                    .setExpiration("5000")  // 设置消息的超时时间,5s
                    .build();  // 开始构建

    备注:生产者创建消息的时候指定

  6. 队列与消息都有超时时间时,以超时时间短的时间为准

三、延迟队列

  1. 利用TTL结合死信交换机,实现了消息发送后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式,应用场景包括:

    1. 延迟短信

    2. 用户下单,支付时间

    3. 预约会议,一段时间后通知所有人

  2. 延迟队列插件

    1. 安装方式:附录

  3. SpringAMQP使用延迟队列插件

    1. DelayExchange的本质还是官方的三种交换机,只是添加了延迟功能,因此使用时只需要声明一个交换机,交换机的类型可以是任意类型,然后设置delayed属性为true即可

      基于注解方式(推荐):

          @RabbitListener(bindings = @QueueBinding(
                  value = @Queue(name = "dl.queue", durable = "true"),
                  exchange = @Exchange(name = "dl.direct", delayed = "true"),
                  key = "dl"
          ))
          public void listenDlQueue(String msg) {
              System.out.println("接收到了dl.queue的延迟消息");
          }

      基于Bean注解方式:

          @Bean
          public DirectExchange delayedExchange() {
              return ExchangeBuilder
                  .directExchange("delay.direct")  // 指定交换机类型和名称
                  .delayed()  // 设置delay属性为true
                  .durable(true)  // 设置持久化
                  .build();
          }
      
          @Bean
          public Queue delayedQueue() {
              return new Queue("delay.queue");
          }
          
          @Bean
          public Binding delayedBinding() {
              return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delay");
          }
    2. 发送消息(设置延迟时间):

      // 创建消息
      Message message = MessageBuilder
          	.withBody("消息内容".getBytes(StandardCharsets.UTF_8))
          	.setHeader("x-delat", 10000)
          	.build();

      备注:设置头信息x-delat就是延迟消息,后面就是延迟时间,单位ms

      因为是延迟消息,MQ会误报消息发送到交换机失败,因此需要在全局的发送确认机制处做一个校验,校验返回的错误内容中receivedDelay是否有值存在,存在,则说明是延迟队列,否则投递失败

      @Configuration
      @Slf4j
      public class CommonConfig implements ApplicationContextAware {
      
          @Override
          public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
              // 获取RabbitTemplate对象
              RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
              // 配置ReturnCallback
              rabbitTemplate.setReturnsCallback(returnedMessage -> {
                  // 判断是否是延迟消息
                  if (returnedMessage.getMessage().getMessageProperties().getReceivedDelay() <= 0) {
                      // 记录日志
                      log.info("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}",
                              returnedMessage.getReplyCode(),
                              returnedMessage.getReplyText(),
                              returnedMessage.getExchange(),
                              returnedMessage.getRoutingKey(),
                              returnedMessage.getMessage().toString());
                      // 其他操作:消息重发等
                      // Todo
                  } else {
      
                      // 是一个延迟队列,可以忽略
                  }
              });
          }
      
      }

      改造后的全局捕获,支持延迟消息

最后更新于

这有帮助吗?