5.2 死信交换机
一、初识死信交换机
当队列中的消息满足满足下列情况之一时,可以称为死信(dead letter):
消费者使用basic.reject或basic.nack声明消费失败,而且消息的requeue参数设置为false
消息是一个过期消息,超时无人消费
要投递的队列消息堆积满了,最早的消息可能成为死信
如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)
如何给队列绑定死信交换机
给队列设置dead-letter-exchange属性,指定一个交换机
给队列上设置dead-letter-routing-key属性,设置死信交换机与死信队列的RutingKey
二、TTL
TTL,也就是Time-To-Live,如果一个队列中的消息TTL结束仍未消费,则会变成死信,TTL超时分为两种情况:
消息所在的队列设置了存活时间
消息本身设置了存活时间
声明一组死信交换机和队列,基于注解方式:
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "dl.queue", durable = "true"), exchange = @Exchange(name = "dl.direct"), key = "dl" )) public void listenDlQueue(String msg) { System.out.println("接收到了dl.queue的延迟消息"); }备注:消费者中,声明死信交换机以及队列,在监听消息的方法中
给队列设施超时时间,并指定死信交换机,需要在声明队列的时候
@Bean public DirectExchange ttlDirectExchange() { return new DirectExchange("ttl.direct"); } @Bean public Queue ttlQueue() { return QueueBuilder .durable("ttl.queue") // 指定队列名称,并持久化 .ttl(10000) // 设置队列的超时时间,10秒 .deadLetterExchange("dl.direct") // 指定死信交换机 .deadLetterRoutingKey("dl") // 指定死信RoutingKey .build(); } @Bean public Binding ttlBinding() { return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl"); }备注:声明正常队列,同时设置超时时间,并且绑定死信交换机
发送消息:
发送消息从发送到ttl交换机以及队列中即可
发送消息时,给消息本身设置超时时间
Message message = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8)) // msg是消息内容 .setExpiration("5000") // 设置消息的超时时间,5s .build(); // 开始构建备注:生产者创建消息的时候指定
队列与消息都有超时时间时,以超时时间短的时间为准
三、延迟队列
利用TTL结合死信交换机,实现了消息发送后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式,应用场景包括:
延迟短信
用户下单,支付时间
预约会议,一段时间后通知所有人
延迟队列插件
安装方式:附录
SpringAMQP使用延迟队列插件
DelayExchange的本质还是官方的三种交换机,只是添加了延迟功能,因此使用时只需要声明一个交换机,交换机的类型可以是任意类型,然后设置delayed属性为true即可
基于注解方式(推荐):
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "dl.queue", durable = "true"), exchange = @Exchange(name = "dl.direct", delayed = "true"), key = "dl" )) public void listenDlQueue(String msg) { System.out.println("接收到了dl.queue的延迟消息"); }基于Bean注解方式:
@Bean public DirectExchange delayedExchange() { return ExchangeBuilder .directExchange("delay.direct") // 指定交换机类型和名称 .delayed() // 设置delay属性为true .durable(true) // 设置持久化 .build(); } @Bean public Queue delayedQueue() { return new Queue("delay.queue"); } @Bean public Binding delayedBinding() { return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delay"); }发送消息(设置延迟时间):
// 创建消息 Message message = MessageBuilder .withBody("消息内容".getBytes(StandardCharsets.UTF_8)) .setHeader("x-delat", 10000) .build();备注:设置头信息x-delat就是延迟消息,后面就是延迟时间,单位ms
因为是延迟消息,MQ会误报消息发送到交换机失败,因此需要在全局的发送确认机制处做一个校验,校验返回的错误内容中
receivedDelay是否有值存在,存在,则说明是延迟队列,否则投递失败@Configuration @Slf4j public class CommonConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 获取RabbitTemplate对象 RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); // 配置ReturnCallback rabbitTemplate.setReturnsCallback(returnedMessage -> { // 判断是否是延迟消息 if (returnedMessage.getMessage().getMessageProperties().getReceivedDelay() <= 0) { // 记录日志 log.info("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}", returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getMessage().toString()); // 其他操作:消息重发等 // Todo } else { // 是一个延迟队列,可以忽略 } }); } }改造后的全局捕获,支持延迟消息
最后更新于
这有帮助吗?