5.1 消息可靠性

一、MQ的一些常见问题

  1. 消息可靠性问题

    1. 如何确保发送的消息至少被消费一次

  2. 延迟消息问题

    1. 如何实现消息的延迟投递

  3. 消息堆积问题

    1. 如何解决数百万消息堆积,无法及时消费的问题

  4. 高可用问题

    1. 如何避免单点的MQ故障导致的不可用问题

二、消息可靠性问题

  1. 消息发送者发送到交换机(exchange),再到队列(queue),再到消费者,消息丢失的情况如下

    1. 发送时丢失

      1. 生产者发送的消息未到达exchange

      2. 消息到达exchange后未到达queue

    2. MQ宕机,queue将丢失消息

    3. 消费者收到消息后未消费就宕机

三、生产者消息确认

  1. RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,标识消息是否处理成功。结果有两种请求:

    1. publisher-confirm,发送者确认

      1. 消息成功投递到交换机,返回ack

      2. 消息未投递到交换机,返回nack

    2. publisher-return,发送者回执

      1. 消息投递到交换机了,但是没有路由到队列,返回ACK,及路由失败原因

    注意:确认机制发送消息时,需要个每一个消息设置一个全局唯一id,以区分不同消息,避免ack冲突

  2. SpringAMQP实现生产者确认

    1. 在yml中添加配置

      spring:
        rabbitmq:
          publisher-confirm-type: correlated # 开启publisher-confirm,异步回调方式
          publisher-returns: true  # 开启publisher-returns
          template:  # 定义消息路由失败策略
            mandatory: true

      说明:

      publisher-confirm-type:开始publisher-confirm,这里支持两种类型

      ​ simple:同步等待confirm结果,直到超时

      ​ correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback

      publisher-returns:开启publisher-returns功能,同样是基于callback机制,不过是定义ReturnCallback

      template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:直接丢弃消息

    2. 每一个TabbitTemplate只能配置一个ReturnCallback,因此需要在项目中配置:

      @Configuration
      @Slf4j
      public class CommonConfig implements ApplicationContextAware {
      
          @Override
          public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
              // 获取RabbitTemplate对象
              RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
              // 配置ReturnCallback
              rabbitTemplate.setReturnsCallback(returnedMessage -> {
                  // 记录日志
                  log.info("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}",
                          returnedMessage.getReplyCode(),
                          returnedMessage.getReplyText(),
                          returnedMessage.getExchange(),
                          returnedMessage.getRoutingKey(),
                          returnedMessage.getMessage().toString());
                  // 其他操作:消息重发等
                  // Todo
              });
          }
      
      }
    3. 发送消息,指定消息ID、消息ConfirmCallback,可以有多个

          @Test
          public void testSendMessage() {
              String routingKey = "simple";
              // 准备消息
              String message = "LonelySnow Test";
              // 准备CorrelationData
              // 消息id
              CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
              // 准备ConfirmCallback
              correlationData.getFuture().addCallback(result -> {
                  // 判断结果
                  if (result.isAck()) {
                      // ACK
                      log.info("消息成功到交换机!消息id:{}", correlationData.getId());
                  } else {
                      // NACK
                      log.error("消息投递到交换机失败!消息id:{}", correlationData.getId());
                      // 可以进行消息重发等操作
                  }
              }, ex -> {
                  // 失败回调
                  // 某些操作,这里记录日志
                  log.error("消息发送失败!", ex);
              });
              // 发送消息
              rabbitTemplate.convertAndSend("amq.topic", routingKey, message);
          }

四、消息持久化

  1. 在控制台创建交换机以及代码的时候,可以将Durability选项选择为Durable,那么就可以将数据持久化

  2. 代码层面操作

    1. 交换机持久化

          @Bean
          public DirectExchange simpleDirect() {
              // 交换机名称,是否持久化,,当前没有队列与其绑定时是否自动删除
              return new DirectExchange("simple.direct", true, false);
          }

      备注:需要在消费者方面进行声明

      SpringAMQP默认交换机是持久化的

    2. 队列持久化

          @Bean
          public Queue simpleQueue() {
              // 使用QueueBuilder构建队列,durable就是持久化的
              return QueueBuilder.durable("simple.queue").build();
          }

      备注:需要在消费者方面进行声明

      SpringAMQP默认队列也是持久化的

    3. 消息持久化

      消息持久化,SpringAMQP中的消息默认是持久的,可以通过MessageProperties中的DeliveryMode来指定:

              Message message = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8))  // msg是消息内容
                      .setDeliveryMode(MessageDeliveryMode.PERSISTENT)  // 定义是否持久化,不持久化:NON_PERSISTENT
                      .build();  // 开始构建

      备注:需要在生产者发送消息时候定义

      SpringAMQP默认也是持久化的

五、消费者确认消息

  1. RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式

    1. manual:手动ack,需要在业务代码结束后,调用api发送ack

    2. auto:自动ack,由Spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack(推荐)

    3. none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

  2. 配置方式:修改yml配置文件,添加如下配置

    spring:
      rabbitmq:
        listener:
          simple:
            prefetch: 1
            acknowledge-mode: none # none:关闭ack; manual:手动ack; auto:自动ack

    注意:消费者确认!消费方添加

六、消费失败重试机制

  1. 当消费者出现异常后(确认消息返回nack),消息会不断requeue(重新入队)到队列,在重新发送给消费者,然后在此异常,再次requeue,无限循环,导致MQ的消息处理飙升,带来不必要的压力

  2. 我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列

    解决办法:配置yaml文件即可

    spring:
      rabbitmq:
        listener:
          simple:
            retry:
              enabled: true  # 开启消费者失败重试
              initial-interval: 1000  # 初次失败等待时长,1s
              multiplier: 1  # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval(上一次等待时长)
              max-attempts: 3  # 最大重试次数
              stateless: true  # true无状态;false有状态。如果业务汇总包含事务,这里改为false

    备注:消费方的问题,在消费者配置

    超过最大次数,消息就会被丢弃

  3. 在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

    1. RejectAndDontRequeueRecoverer:重试次数耗尽后,直接reject,丢弃消息。默认

    2. ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队列

    3. RepublishMessageRecoverer:重试耗尽后,将失败消息投递到制定的交换机(推荐)

  4. RepublishMessageRecoverer推荐模式做法:(消费方)

    1. 首先定义失败消息的交换机、队列以及其绑定关系

      @Bean
      public DirectExchange errorMessageExchange() {
          return new DirectExchange("error.direct");
      }
      
      @Bean
      public Queue errorQueue() {
          return new Queue("error.queue", true);
      }
      
      @Bean
      public Binding errorBinding() {
          // error为RoutingKey
          return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
      }
    2. 定义RepublishMessageRecoverer:

      @Bean
      public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
          return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
      }

      备注:交换机以及Routing Key必须与上面定义的相同!!

最后更新于

这有帮助吗?