4.3 SpringAMQP
一、Basic Queue简单队列模型
父类中引入spring-amqp依赖
在yml中配置mq的地址,虚拟机,端口,用户名,密码等即可
在publisher服务中利用RabbitMQTemplate发送消息到simple.queue这个队列
在consumer服务中编写消费逻辑,绑定simple.queue这个队列
备注:如果Mq中没有simple.queue这个队列,需要手动在MQ中创建这个队列
消费者需要编写监听方法,监听对应的队列即可
// 配置信息
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /
username: 用户名
password: 密码
// 生产者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
String queueName = "simple.queue";
String message = "LonelySnow test";
rabbitTemplate.convertAndSend(queueName, message);
}
// 消费者
@Component
public class RabbitMqListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {
System.out.println("消费者接收到消息:" + message);
}
}二、Work Queue工作队列模型

工作队列,可以提高消息处理速度,避免消息队列堆积
案例方法
生产者服务中,每秒产生50条消息,发送到simple.queue
消费者中定义两个监听方法,同时监听simple.queue
消费者1每秒消费50条,消费者2每秒消费10条
注意:
RabbitMQ在不做配置的情况下,会有消息预取的情况,也就是先将消息平均分配,然后再各自慢慢消费
解决办法:
在消费者的yml配置中进行设置,控制消息预取的上限
spring: rabbitmq: listener: simple: prefetch: 1 # 每次只能获取一条消息,处理完才能获取下一条消息
// 引用同简单模式,这里只列举方法
// 生产者:
@Test
public void testWorkQueue() throws InterruptedException {
String queueName = "simple.queue";
String message = "LonelySnow test:";
for (int i = 1; i <= 50; i++) {
rabbitTemplate.convertAndSend(queueName, message + i);
# 休眠20毫秒,保证50条消息一秒钟发送完
Thread.sleep(20);
}
}
// 消费者
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String message) throws InterruptedException {
System.out.println("消费者 1 接收到消息:【" + message + "】 " + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String message) throws InterruptedException {
System.out.println("消费者 2 接收到消息:【" + message + "】 " + LocalTime.now());
Thread.sleep(200);
}三、发布订阅模型
工作队列与简单队列属于一对一,被消费了就没了
发布订阅模型允许将统一消息发送给多个消费者,消息可以被多次消费。实现方式就是加入了交换机
常见的交换机类型
Fanout:广播
Direct:路由
Topic:话题
注意:exchange只负责消息转发,不负责消息存储,如果路由失败,则消息丢失

四、发布、订阅模型-Fanout
Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue
操作方式
在consumer服务中,利用代码声明两个队列、交换机、并将两者绑定
在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
在publisher中编写测试方法,向交换机发送消息
// 消费端声明交换机以及队列,并进行绑定
@Configuration
public class FanoutConfig {
// 声明交换机
@Bean
public FanoutExchange snowFanoutExchange() {
return new FanoutExchange("snow.fanout");
}
// 声明队列1
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout.queue1");
}
// 声明队列2
@Bean
public Queue fanoutQueue2() {
return new Queue("fanout.queue2");
}
// 声明绑定关系
@Bean
public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange snowFanoutExchange) {
return BindingBuilder
.bind(fanoutQueue1)
.to(snowFanoutExchange);
}
@Bean
public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange snowFanoutExchange) {
return BindingBuilder
.bind(fanoutQueue2)
.to(snowFanoutExchange);
}
}
// 监听程序
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String message) {
System.out.println("消费者接收到fanout.queue1消息:【" + message + "】 ");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String message) {
System.out.println("消费者接收到fanout.queue2消息:【" + message + "】 ");
}
// 发送消息
@Test
public void testSendFanoutExchange() {
// 交换机名称
String exchangeName = "snow.fanout";
// 消息内容
String msg = "FanoutExchange Message ";
// 发送消息 交换机名称,BingingKey,消息内容
rabbitTemplate.convertAndSend(exchangeName, "", msg);
}五、发布、订阅模型-Direct
DirectExchange会将收到的消息根据规则路由到指定的Queue,因此被称为路由模式
每一个Queue都与Exchange设置一个BindingKey
发布者发送消息时,指定消息的RoutingKey
Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
实现思路
可以利用@RabbitListener声明Exchange、Queue、RoutingKey
// 消费者监听程序
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "snow.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void ListenDirectQueue1(String msg) {
System.out.println("消费者接收到direct.queue1消息:【" + msg + "】 ");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "snow.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void ListenDirectQueue2(String msg) {
System.out.println("消费者接收到direct.queue2消息:【" + msg + "】 ");
}
// 生产者发送消息
@Test
public void testSendFanoutExchange() {
// 交换机名称
String exchangeName = "snow.direct";
// 消息内容
String msg1 = "DirectExchange Message blue";
String msg2 = "DirectExchange Message yellow";
String msg3 = "DirectExchange Message red";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "blue", msg1);
rabbitTemplate.convertAndSend(exchangeName, "yellow", msg2);
rabbitTemplate.convertAndSend(exchangeName, "red", msg3);
}六、发布、订阅模型-Topic
TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并以
.分割Queue与Exchange指定BindingKey时,可以使用通配符:
#:代指0个或多个单词
*:代指一个单词
// 消费方交换机规则
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "snow.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenDirectTopic1(String msg) {
System.out.println("消费者接收到topic.queue1消息:【" + msg + "】 ");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "snow.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenDirectTopic2(String msg) {
System.out.println("消费者接收到topic.queue2消息:【" + msg + "】 ");
}
// 生产方生产
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "snow.topic";
// 消息内容
String msg1 = "中国消息";
String msg2 = "天气信息";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", msg1);
rabbitTemplate.convertAndSend(exchangeName, "china.weather", msg2);
}七、消息转换器
在SpringAMQP的发送方法中,接收消息的类型是Object,也就是我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送;
修改JDK序列化为JSON序列化
定义一个MessageCoverter类型的Bean即可
在生产方,publisher服务引入依赖
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>在publisher服务中声明MessageConverter
@Configuration public class MqMessageConverter { @Bean public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } }生产者发送消息
@Test public void sendObject() { Map<String, Object> msg = new HashMap<>(); msg.put("name", "北雪南星"); msg.put("age", 25); rabbitTemplate.convertAndSend("object.queue", msg); }消费方处理
引入依赖
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>声明MessageConverter
@Configuration public class MqMessageConverter { @Bean public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } }编写接收队列内容
@RabbitListener(queues = "object.queue") public void listenObjectQueue(Map<String, Object> msg) { System.out.println("接收到Object.queue消息" + msg); }首先定义通道
@Bean public Queue objectQueue() { return new Queue("object.queue"); }
最后更新于
这有帮助吗?