4.3 SpringAMQP

一、Basic Queue简单队列模型

  1. 父类中引入spring-amqp依赖

  2. 在yml中配置mq的地址,虚拟机,端口,用户名,密码等即可

  3. 在publisher服务中利用RabbitMQTemplate发送消息到simple.queue这个队列

  4. 在consumer服务中编写消费逻辑,绑定simple.queue这个队列

    备注:如果Mq中没有simple.queue这个队列,需要手动在MQ中创建这个队列

  5. 消费者需要编写监听方法,监听对应的队列即可

// 配置信息
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /
    username: 用户名
    password: 密码
        
// 生产者
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testSimpleQueue() {
    String queueName = "simple.queue";
    String message = "LonelySnow test";

    rabbitTemplate.convertAndSend(queueName, message);
}

// 消费者
@Component
public class RabbitMqListener {

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String message) {
        System.out.println("消费者接收到消息:" + message);
    }

}

二、Work Queue工作队列模型

Work Queue工作队列图
  1. 工作队列,可以提高消息处理速度,避免消息队列堆积

  2. 案例方法

    1. 生产者服务中,每秒产生50条消息,发送到simple.queue

    2. 消费者中定义两个监听方法,同时监听simple.queue

    3. 消费者1每秒消费50条,消费者2每秒消费10条

  3. 注意:

    1. RabbitMQ在不做配置的情况下,会有消息预取的情况,也就是先将消息平均分配,然后再各自慢慢消费

    2. 解决办法:

      1. 在消费者的yml配置中进行设置,控制消息预取的上限

        spring:
          rabbitmq:
            listener:
              simple:
                prefetch: 1 # 每次只能获取一条消息,处理完才能获取下一条消息
// 引用同简单模式,这里只列举方法
// 生产者:
@Test
public void testWorkQueue() throws InterruptedException {
    String queueName = "simple.queue";
    String message = "LonelySnow test:";

    for (int i = 1; i <= 50; i++) {
        rabbitTemplate.convertAndSend(queueName, message + i);
        # 休眠20毫秒,保证50条消息一秒钟发送完
        Thread.sleep(20);
    }
}

// 消费者
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String message) throws InterruptedException {
    System.out.println("消费者  1  接收到消息:【" + message + "】 " + LocalTime.now());
    Thread.sleep(20);
}

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String message) throws InterruptedException {
    System.out.println("消费者  2  接收到消息:【" + message + "】 " + LocalTime.now());
    Thread.sleep(200);
}

三、发布订阅模型

  1. 工作队列与简单队列属于一对一,被消费了就没了

  2. 发布订阅模型允许将统一消息发送给多个消费者,消息可以被多次消费。实现方式就是加入了交换机

  3. 常见的交换机类型

    1. Fanout:广播

    2. Direct:路由

    3. Topic:话题

  4. 注意:exchange只负责消息转发,不负责消息存储,如果路由失败,则消息丢失

发布订阅模型图

四、发布、订阅模型-Fanout

  1. Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue

  2. 操作方式

    1. 在consumer服务中,利用代码声明两个队列、交换机、并将两者绑定

    2. 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

    3. 在publisher中编写测试方法,向交换机发送消息

// 消费端声明交换机以及队列,并进行绑定
@Configuration
public class FanoutConfig {

    // 声明交换机
    @Bean
    public FanoutExchange snowFanoutExchange() {
        return new FanoutExchange("snow.fanout");
    }

    // 声明队列1
    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanout.queue1");
    }

    // 声明队列2
    @Bean
    public Queue fanoutQueue2() {
        return new Queue("fanout.queue2");
    }

    // 声明绑定关系
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange snowFanoutExchange) {
        return BindingBuilder
                .bind(fanoutQueue1)
                .to(snowFanoutExchange);
    }

    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange snowFanoutExchange) {
        return BindingBuilder
                .bind(fanoutQueue2)
                .to(snowFanoutExchange);
    }

}


// 监听程序
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String message) {
    System.out.println("消费者接收到fanout.queue1消息:【" + message + "】 ");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String message) {
    System.out.println("消费者接收到fanout.queue2消息:【" + message + "】 ");
}


// 发送消息
@Test
public void testSendFanoutExchange() {
    // 交换机名称
    String exchangeName = "snow.fanout";
    // 消息内容
    String msg = "FanoutExchange Message ";
    // 发送消息  交换机名称,BingingKey,消息内容
    rabbitTemplate.convertAndSend(exchangeName, "", msg);
}

五、发布、订阅模型-Direct

  1. DirectExchange会将收到的消息根据规则路由到指定的Queue,因此被称为路由模式

    1. 每一个Queue都与Exchange设置一个BindingKey

    2. 发布者发送消息时,指定消息的RoutingKey

    3. Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

  2. 实现思路

    1. 可以利用@RabbitListener声明Exchange、Queue、RoutingKey

// 消费者监听程序
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue1"),
        exchange = @Exchange(name = "snow.direct", type = ExchangeTypes.DIRECT),
        key = {"red", "blue"}
))
public void ListenDirectQueue1(String msg) {
    System.out.println("消费者接收到direct.queue1消息:【" + msg + "】 ");
}

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue2"),
        exchange = @Exchange(name = "snow.direct", type = ExchangeTypes.DIRECT),
        key = {"red", "yellow"}
))
public void ListenDirectQueue2(String msg) {
    System.out.println("消费者接收到direct.queue2消息:【" + msg + "】 ");
}


// 生产者发送消息
@Test
public void testSendFanoutExchange() {
    // 交换机名称
    String exchangeName = "snow.direct";
    // 消息内容
    String msg1 = "DirectExchange Message blue";
    String msg2 = "DirectExchange Message yellow";
    String msg3 = "DirectExchange Message red";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "blue", msg1);
    rabbitTemplate.convertAndSend(exchangeName, "yellow", msg2);
    rabbitTemplate.convertAndSend(exchangeName, "red", msg3);
}

六、发布、订阅模型-Topic

  1. TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并以.分割

  2. Queue与Exchange指定BindingKey时,可以使用通配符:

    1. #:代指0个或多个单词

    2. *:代指一个单词

// 消费方交换机规则
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue1"),
        exchange = @Exchange(name = "snow.topic", type = ExchangeTypes.TOPIC),
        key = "china.#"
))
public void listenDirectTopic1(String msg) {
    System.out.println("消费者接收到topic.queue1消息:【" + msg + "】 ");
}

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue2"),
        exchange = @Exchange(name = "snow.topic", type = ExchangeTypes.TOPIC),
        key = "#.news"
))
public void listenDirectTopic2(String msg) {
    System.out.println("消费者接收到topic.queue2消息:【" + msg + "】 ");
}


// 生产方生产
@Test
public void testSendTopicExchange() {
    // 交换机名称
    String exchangeName = "snow.topic";
    // 消息内容
    String msg1 = "中国消息";
    String msg2 = "天气信息";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", msg1);
    rabbitTemplate.convertAndSend(exchangeName, "china.weather", msg2);
}

七、消息转换器

  1. 在SpringAMQP的发送方法中,接收消息的类型是Object,也就是我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送;

  2. 修改JDK序列化为JSON序列化

    1. 定义一个MessageCoverter类型的Bean即可

    2. 在生产方,publisher服务引入依赖

      <dependency>
          <groupId>com.fasterxml.jackson.core</groupId>
          <artifactId>jackson-databind</artifactId>
      </dependency>
    3. 在publisher服务中声明MessageConverter

      @Configuration
      public class MqMessageConverter {
      
          @Bean
          public MessageConverter jsonMessageConverter() {
              return new Jackson2JsonMessageConverter();
          }
      
      }
    4. 生产者发送消息

      @Test
      public void sendObject() {
          Map<String, Object> msg = new HashMap<>();
          msg.put("name", "北雪南星");
          msg.put("age", 25);
          rabbitTemplate.convertAndSend("object.queue", msg);
      }
    5. 消费方处理

      1. 引入依赖

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
      2. 声明MessageConverter

        @Configuration
        public class MqMessageConverter {
        
            @Bean
            public MessageConverter jsonMessageConverter() {
                return new Jackson2JsonMessageConverter();
            }
        
        }
      3. 编写接收队列内容

        @RabbitListener(queues = "object.queue")
        public void listenObjectQueue(Map<String, Object> msg) {
            System.out.println("接收到Object.queue消息" + msg);
        }
      4. 首先定义通道

        @Bean
        public Queue objectQueue() {
            return new Queue("object.queue");
        }

最后更新于

这有帮助吗?