5.1 消息可靠性
一、MQ的一些常见问题
消息可靠性问题
如何确保发送的消息至少被消费一次
延迟消息问题
如何实现消息的延迟投递
消息堆积问题
如何解决数百万消息堆积,无法及时消费的问题
高可用问题
如何避免单点的MQ故障导致的不可用问题
二、消息可靠性问题
消息发送者发送到交换机(exchange),再到队列(queue),再到消费者,消息丢失的情况如下
发送时丢失
生产者发送的消息未到达exchange
消息到达exchange后未到达queue
MQ宕机,queue将丢失消息
消费者收到消息后未消费就宕机
三、生产者消息确认
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,标识消息是否处理成功。结果有两种请求:
publisher-confirm,发送者确认
消息成功投递到交换机,返回ack
消息未投递到交换机,返回nack
publisher-return,发送者回执
消息投递到交换机了,但是没有路由到队列,返回ACK,及路由失败原因
注意:确认机制发送消息时,需要个每一个消息设置一个全局唯一id,以区分不同消息,避免ack冲突
SpringAMQP实现生产者确认
在yml中添加配置
spring: rabbitmq: publisher-confirm-type: correlated # 开启publisher-confirm,异步回调方式 publisher-returns: true # 开启publisher-returns template: # 定义消息路由失败策略 mandatory: true说明:
publisher-confirm-type:开始publisher-confirm,这里支持两种类型 simple:同步等待confirm结果,直到超时
correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publisher-returns:开启publisher-returns功能,同样是基于callback机制,不过是定义ReturnCallback
template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:直接丢弃消息每一个TabbitTemplate只能配置一个ReturnCallback,因此需要在项目中配置:
@Configuration @Slf4j public class CommonConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 获取RabbitTemplate对象 RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); // 配置ReturnCallback rabbitTemplate.setReturnsCallback(returnedMessage -> { // 记录日志 log.info("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}", returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getMessage().toString()); // 其他操作:消息重发等 // Todo }); } }发送消息,指定消息ID、消息ConfirmCallback,可以有多个
@Test public void testSendMessage() { String routingKey = "simple"; // 准备消息 String message = "LonelySnow Test"; // 准备CorrelationData // 消息id CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 准备ConfirmCallback correlationData.getFuture().addCallback(result -> { // 判断结果 if (result.isAck()) { // ACK log.info("消息成功到交换机!消息id:{}", correlationData.getId()); } else { // NACK log.error("消息投递到交换机失败!消息id:{}", correlationData.getId()); // 可以进行消息重发等操作 } }, ex -> { // 失败回调 // 某些操作,这里记录日志 log.error("消息发送失败!", ex); }); // 发送消息 rabbitTemplate.convertAndSend("amq.topic", routingKey, message); }
四、消息持久化
在控制台创建交换机以及代码的时候,可以将
Durability选项选择为Durable,那么就可以将数据持久化代码层面操作
交换机持久化
@Bean public DirectExchange simpleDirect() { // 交换机名称,是否持久化,,当前没有队列与其绑定时是否自动删除 return new DirectExchange("simple.direct", true, false); }备注:需要在消费者方面进行声明
SpringAMQP默认交换机是持久化的
队列持久化
@Bean public Queue simpleQueue() { // 使用QueueBuilder构建队列,durable就是持久化的 return QueueBuilder.durable("simple.queue").build(); }备注:需要在消费者方面进行声明
SpringAMQP默认队列也是持久化的
消息持久化
消息持久化,SpringAMQP中的消息默认是持久的,可以通过MessageProperties中的DeliveryMode来指定:
Message message = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8)) // msg是消息内容 .setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 定义是否持久化,不持久化:NON_PERSISTENT .build(); // 开始构建备注:需要在生产者发送消息时候定义
SpringAMQP默认也是持久化的
五、消费者确认消息
RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式
manual:手动ack,需要在业务代码结束后,调用api发送ack
auto:自动ack,由Spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack(推荐)
none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
配置方式:修改yml配置文件,添加如下配置
spring: rabbitmq: listener: simple: prefetch: 1 acknowledge-mode: none # none:关闭ack; manual:手动ack; auto:自动ack注意:消费者确认!消费方添加
六、消费失败重试机制
当消费者出现异常后(确认消息返回nack),消息会不断requeue(重新入队)到队列,在重新发送给消费者,然后在此异常,再次requeue,无限循环,导致MQ的消息处理飙升,带来不必要的压力
我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列
解决办法:配置yaml文件即可
spring: rabbitmq: listener: simple: retry: enabled: true # 开启消费者失败重试 initial-interval: 1000 # 初次失败等待时长,1s multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval(上一次等待时长) max-attempts: 3 # 最大重试次数 stateless: true # true无状态;false有状态。如果业务汇总包含事务,这里改为false备注:消费方的问题,在消费者配置
超过最大次数,消息就会被丢弃
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
RejectAndDontRequeueRecoverer:重试次数耗尽后,直接reject,丢弃消息。默认
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队列
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到制定的交换机(推荐)
RepublishMessageRecoverer推荐模式做法:(消费方)
首先定义失败消息的交换机、队列以及其绑定关系
@Bean public DirectExchange errorMessageExchange() { return new DirectExchange("error.direct"); } @Bean public Queue errorQueue() { return new Queue("error.queue", true); } @Bean public Binding errorBinding() { // error为RoutingKey return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error"); }定义RepublishMessageRecoverer:
@Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) { return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); }备注:交换机以及Routing Key必须与上面定义的相同!!
最后更新于
这有帮助吗?