学静思语
Published on 2025-06-23 / 3 Visits
0
0

RabbitMQ-消息队列

RabbitMQ-消息队列

一、RabbitMQ学习步骤

1. 基础篇

1.1 同步和异步

1.2 MQ技术选型

1.3 数据隔离

1.4 SpringAMQP

1.5 work模式

1.6 MQ消息转换器

1.7 发布订阅模式

1.8 消息堆积问题处理

2. 高级篇

2.1 发送者重连

2.2 发送者确定

2.3 MQ持久化

2.4 LazyQueue

2.5 消费者确认

2.6 失败重试

2.7 业务幂等

2.8 延迟消息

二、同步和异步

1. 同步

1.1 同步的优势

  • 时效性强,等待结果后才返回

1.2 同步调用的缺点

  • 扩展性差
  • 性能下降
  • 级联失败问题

2. 异步

2.1 异步调用

  • 异步调用方式其实就是基于消息通知的方式,一般包含三个角色:
  • 消息发送者:消息的投递者,也就是消息的发送方
  • 消息代理:管理、暂存、转发消息,可以理解为一个中间件
  • 消息消费者:接收消息和处理消息的消费者,也就是用来接收消息的接收方

image-20250417170643229

2.2 异步调用的优势

  • 耦合度低,扩展性强
  • 异步调用,无需等待,性能好。
  • 故障隔离,下游服务器故障不影响上游业务
  • 缓存消息,流量削峰填谷

2.3 异步调用的缺点

  • 不能立即得到调用结果,时效性差
  • 不确定下游业务执行是否成功
  • 业务完全依赖于Broker的可靠性

三、MQ的技术选型

  • MQ(Message Queue),中文是消息队列,字面来看就是存放消息的队列,也就是异步调用中的Broker

| | RabbitMQ | ActiveMQ | RocketMQ | Kafka |
| ———- | ——————– | —————————– | ———- | ———- |
| 公司/社区 | Rabbit | Apache | 阿里 | Apache |
| 开发语言 | Erlang | java | java | Scala&java |
| 协议支持 | AMQP,XMPP,SMTP,STOMP | QpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
| 可用性 | 高 | 一般 | 高 | 高 |
| 单机吞吐量 | 一般 | 差 | 高 | 非常高 |
| 消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
| 消息可靠性 | 高 | 一般 | 高 | 一般 |

四、Rabbit MQ

1. 介绍和安装

1.1 介绍

  • 官方网址:https://www.rabbitmq.com/

    image-20250417172405412

  • RabbitMQ的整体架构以及核心思想

    image-20250417222413328

  • VirtualHost:虚拟主机,起着数据隔离的作用

  • publisher:发布者,负责消息的发送

  • consumer:消费者,负责接收消息

  • exchange:交换机,负责路由消息

  • queue:队列,用于存储消息

1.2 安装

  • 使用Docker进行安装

    docker run \
    --name rabbitmq \
    -e RABBITMQ_DEFAULT_USER=leon \
    -e RABBITMQ_DEFAULT_PASS=123456 \
    --hostname mq \
    -v /usr/local/rabbitmq/mq-plugins:/mq_plugins \
    -p 15672:15672 \
    -p 5672:5672 \ 
    -d \
    rabbitmq:3.8-management
    
  • 解读

  • –name rabbitmq 指定运行镜像名称

  • -e RABBITMQ_DEFAULT_USER=leon 设置默认登录的账号

  • -e RABBITMQ_DEFAULT_PASS=123456 设置默认登录账号的密码

  • –hostname mq 设置当前RabbitMQ主机的名称,这样就不会使用默认的主机名称

  • -p 15672:15672 配置管理控制台的图形化界面的端口

  • -p 5672:5672 配置消息发送处理接口

  • -d 后台运行

  • rabbitmq:3.8-management RabbitMQ的镜像

  • 通过ip:15672访问管理控制台

    image-20250417220446276

2. 快速入门

2.1 管理控制台

image-20250417220829553

image-20250417221432068

image-20250417221553036

image-20250417221700923

2.2 案例演示

2.2.1 需求:在RabbitMQ的控制台完成下列操作:
  • 新建队列hello.queue1和hello.queue2

  • 向默认的amp.fanout交换机发送一条消息

  • 查看消息是否到达hello.queue1和hello.queue2

  • 绑定(不用路由key)上述交换机与队列之后再发送

  • 总结规律

image-20250417223136854

image-20250417223223139

image-20250417223314030

image-20250417223343666

image-20250417223526625

image-20250417223621426

image-20250417223659663

image-20250417223811461

  • 总结
  • 交换机只能路由消息,不能存储消息。
  • 交换机只会将消息发送给与交换机绑定的消息队列,因此消息队列需要与交换机绑定

3. 数据隔离

3.1 需求:在RabbitMQ的控制台完成下列操作:

  • 新建一个用户hmall,密码123,administrator权限
  • 为hmall用户创建一个virtual host;名字为/hmall
  • 测试不同virtual host之间的数据隔离现象

image-20250417224644742

image-20250417224800546

image-20250417225216548

image-20250417225257890

五、Java客户端

1. 快速入门

1.1 AMQP

  • AMQP
  • Adcanced Message Queuing Protocol,是用于在应用程序之间传递业务消息的开放标准,该协议与语言和平台无关,更符合微服务中独立性的要求
  • Spring AMQP
  • Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含了两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

1.2 快速入门案例

  • 创建发布者和消费者项目

    image-20250418110551180

    image-20250418110608437

  • 使用父项目聚合,管理发布者和消费者,并导入相关依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <parent>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-parent</artifactId>
          <version>2.7.12</version>
      </parent>
    
      <groupId>com.leon</groupId>
      <version>1.0-SNAPSHOT</version>
      <artifactId>rabbitmq_study</artifactId>
    
      <modules>
          <module>consumers</module>
          <module>publisher</module>
      </modules>
    
      <properties>
          <maven.compiler.source>11</maven.compiler.source>
          <maven.compiler.target>11</maven.compiler.target>
          <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      </properties>
    
      <dependencies>
          <dependency>
              <groupId>org.projectlombok</groupId>
              <artifactId>lombok</artifactId>
          </dependency>
          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-amqp</artifactId>
          </dependency>
          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-test</artifactId>
          </dependency>
      </dependencies>
    
    </project>
    
  • 在发布者和消费者的application.yml文件中配置Rabbit主机位置及其相关配置

    spring:
    # 设置RabbitMQ的IP地址,端口,虚拟主机,账号,密码
    rabbitmq:
      # IP地址
      addresses: IP地址
      # 端口号
      port: 5672
      # 虚拟主机
      virtual-host: 虚拟主机
      # 账号
      username: 账号
      # 密码
      password: 密码
    
  • 创建主驱动类

    package com.leon.rabbitmq;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    /**
    
  • ClassName:ConsumerApplication

  • Package:com.leon.rabbitmq

  • Description:
    *

  • @Author: leon

  • @Version: 1.0
    */
    @SpringBootApplication
    public class ConsumerApplication {

    public static void main(String[] args) {

      SpringApplication.run(ConsumerApplication.class,args);
    

    }

    }

    package com.leon.rabbitmq;

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;

    /**

  • ClassName:PublisherApplication

  • Package:com.leon.rabbitmq

  • Description:

  • @Author: leon

  • @Version: 1.0
    */
    @SpringBootApplication
    public class PublisherApplication {

    public static void main(String[] args) {

    SpringApplication.run(PublisherApplication.class,args);
    

    }

    }

  • 需求如下:

  • 利用控制台创建队列simple.queue

  • 在publisher服务中,利用SpringAMQP直接向simple.queue发送消息

  • 在consumer服务中,利用SpringAMQP编写消费者,监听simple.queue队列

  • 案例实现

  • 发布者代码

    package com.leon.rabbitmq.publisher;
    
    import org.junit.jupiter.api.Test;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    /**
    * ClassName:PublisherApplicationTest
    * Package:com.leon.rabbitmq.publisher
    * Description:
    * @Author: leon
    * @Version: 1.0
    */
    @SpringBootTest
    public class PublisherApplicationTest {
    
       @Autowired
       private RabbitTemplate rabbitTemplate;
    
       @Test
       public void testSimpleQueue(){
          //设置消息
          String message = "Hello java rabbitmq ";
          // 消息队列名
          String queueName = "simple.queue";
          // 发布消息
          rabbitTemplate.convertAndSend(queueName,message);
    
    
       }
    
    
    }
    
  • 消费者代码

    package com.leon.rabbitmq.consumer;
    
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * ClassName:Consumer
     * Package:com.leon.rabbitmq.consumer
     * Description:
     *
     * @Author: leon
     * @Version: 1.0
     */
    @Component
    public class Consumer {
    
        @RabbitListener(queues ={"simple.queue"} )
        public void readSimpleQueue(String message){
    
            System.out.println("接收到来之simple.queue队列的消息: ====>" + message);
    
        }
    
    }
    

2. WorkQueue

2.1 任务模型

  • Work Queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息

    image-20250418162119101

2.2 案例演示

  • 模拟WorkQueue,实现一个队列绑定多个消费者

  • 在RabbitMQ的控制台创建一个队列,名为work.queue

  • 在publisher服务中定义测试方法,在1秒内产生50条消息(每发一条沉睡20ms),发送到work.queue

  • 在consumer服务中定义两个消息监听者,都监听work.queue队列

  • 消费者1每秒处理50条(沉睡20ms)消息,消费者2每秒处理5条(沉睡200ms)消息

  • 代码实现

  • 发布者代码

    // 向work.queue队列发送信息
    @Test
    public void testWorkQueue() {
    
        // 设置消息名称
        String message = " Hello java rabbitmq ---- work.queue";
        // 设置队列名称
        String queueName = "work.queue";
        // 循环发送信息
        for (int i = 1; i <= 50; i++) {
            // 发送消息
            rabbitTemplate.convertAndSend(queueName, message + "=="+ i);
    
            try {
                // 睡眠
                Thread.sleep(20);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
    
        }
    
    }
    
  • 消费者代码

    @RabbitListener(queues = {"work.queue"})
    public void readWorkQueue1(String message){
        // 发送消息
        System.out.println("消费者 1 ,正在消费work.queue队列的中信息 ====>" + message);
    
        try {
            // 睡眠
            Thread.sleep(20);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
    
    @RabbitListener(queues = {"work.queue"})
    public void readWorkQueue2(String message){
        // 发送消息
        System.out.println("消费者 2 ,正在消费work.queue队列的中信息 ====>" + message);
    
        try {
            // 睡眠
            Thread.sleep(200);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
    
  • 为了最大化的发挥消费的能力,将获取消息的数量设置为1

    spring:
    rabbitmq:
      listener:
        # 简单模式
        simple:
          # 设置每次重消息队列中获取的消息数量为1
          prefetch: 1
    
  • 总结

  • 多个消费者绑定到一个队列,可以加快消息处理速度。

  • 同一条消息只会被一个消费者处理

  • 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳

3. Fanout交换机

3.1 交换机

  • 真正生产环境都会经过exchange来发送消息,而不是直接发送到队列,交换机的类型有三种:

  • Fanout:广播

  • Direct:定向

  • Topic:话题

    image-20250418164818412

3.2 Fanout交换机

  • Fanout Exchange会将收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式

    image-20250418164939718

3.3 案例演示

  • 利用SpringAMQP演示FanoutExchange的使用

  • 在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2

  • 在RabbitMQ控制台中,声明交换机hmall.fanout,将两个队列与其绑定

  • 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

  • 在publisher中编写测试方法,向hmall.fanout发送消息

    image-20250418165104622

  • 代码实现

  • 发布者代码

    // 向leon.fanout交换机发送信息
    @Test
    public void testFanoutQueue(){
        // 设置消息名称
        String message = " Hello java rabbitmq ---- leon.fanout";
        // 设置队列名称
        String exchangeName = "leon.fanout";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName,"",message);
    
    
    }
    
  • 消费者代码

    // 读取fanout交换机发出的信息
    @RabbitListener(queues = {"fanout.queue1"})
    public void readFanoutQueue1(String message){
    
        System.out.println("消费者 1 消费了fanout.queue1队列中的消息 ====>" + message);
    
    }
    
    @RabbitListener(queues = {"fanout.queue2"})
    public void readFanoutQueue2(String message){
    
        System.out.println("消费者 2 消费了fanout.queue2队列中的消息 ====>" + message);
    
    }
    
  • 交换机的作用是什么

  • 接收publisher发送的消息

  • 将消息按照规则路由到与之绑定的队列

  • FanoutExchange的会将消息路由到每个绑定的队列

4.Direct交换机

4.1 Direct交换机

  • Direct Exchange会将接收到的消息根据规则路由到指定的queue,因此称为定向路由。
  • 每一个Queue都与Exchange设置一个RoutingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到消息RoutingKey一致的队列

image-20250418170833592

4.2 案例演示

  • 利用SpringAMQP演示DirectExchange的使用

  • 在RabbitMQ控制台中,声明队列direct.queue1和direct.queue2

  • 在RabbitMQ控制台中,声明交换机hmall. direct ,将两个队列与其绑定

  • 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

  • 在publisher中编写测试方法,利用不同的RoutingKey向hmall. direct发送消息

    image-20250418171025130

  • 代码实现

  • 发布者代码

    // 获取direct交换机发出的消息
    @RabbitListener(queues = {"direct.queue1"})
    public void readDirectQueue1(String message){
        System.out.println("消费者 1 消费类direct.queue1队列中的消息 ====>" + message);
    }
    
    @RabbitListener(queues = {"direct.queue2"})
    public void readDirectQueue2(String message){
        System.out.println("消费者 2 消费类direct.queue2队列中的消息 ====>" + message);
    }
    
  • 消费者代码

    @Test
    public void testDirectQueue(){
        // 设置消息
        String message = " Hello java rabbitmq ---- leon.direct";
        // 设置交换机名称
        String exchangeName = "leon.direct";
        // 设置Routing Key
        String routingKey = "red";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName,routingKey,message+"  "+ routingKey);
        // 重新设置Routing Key
        routingKey = "blue";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName,routingKey,message+"  "+ routingKey);
    
    
    }
    
  • 描述一下Direct交换机与Fanout交换机的差异

  • Fanout交换机将消息路由给一个与之绑定的队列

  • Direct交换机根据Routing Key判断路由给哪个队列

  • 如果多个队列具有相同Routing Key,则与Fanout功能类似

5.Topic交换机

5.1 Topic交换机

  • Topic Exchange与Direct Exchange类似,区别在于routing key 可以是多个单词,并且以.分割

  • Queque与Exchange指定Binding Key时可以使用通配符:

    • :代指0个或多个单词(注意此处时单词不是字符,而且单词之间以.隔开)

    • *:代指一个单词(注意此处时单词不是字符,而且单词之间以.隔开)

    image-20250418172932009

5.2 案例演示

  • 利用SpringAMQP演示TopicExchange的使用

  • 在RabbitMQ控制台中,声明队列topic.queue1和topic.queue2

  • 在RabbitMQ控制台中,声明交换机hmall. topic ,将两个队列与其绑定

  • 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2

  • 在publisher中编写测试方法,利用不同的RoutingKey向hmall. topic发送消息

    image-20250418173557323

  • 代码实现

  • 发布者代码

    @Test
    public void testTopicQueue(){
        // 设置发送消息
        String message = " Hello java rabbitmq ---- leon.topic - 中国 ";
        // 设置交换机
        String exchangeName = "leon.topic";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName,"china.message",message);
    
        message = " Hello java rabbitmq ---- leon.topic - 新闻";
    
        rabbitTemplate.convertAndSend(exchangeName,"news",message);
    
        message = " Hello java rabbitmq ---- leon.topic - 中国新闻";
        rabbitTemplate.convertAndSend(exchangeName,"china.message",message);
    
    
    
    }
    
  • 消费者代码

    // 获取topic交换机发出的消息
    @RabbitListener(queues = {"topic.queue1"})
    public void readTopicQueue1(String message){
        System.out.println("消费者 1 消费类topic.queue1队列中的消息 ====>" + message);
    }
    
    @RabbitListener(queues = {"topic.queue2"})
    public void readTopicQueue2(String message){
        System.out.println("消费者 2 消费类topic.queue2队列中的消息 ====>" + message);
    }
    
  • Direct交换机和Topic交换机的差异

  • Topic交换机接收的消息Routing Key可以时多个单词,以.分割,但是Direct只允许一个单词。

  • Topic交换机与队列绑定时的bindingKey可以指定通配符,而Direct不支持通配符。

    • :代表多个单词或0个

    • *:代表一个单词

6.声明队列交换机

6.1 声明队列交换机

  • Spring AMQP提供了几个类,用来声明队列、交换机及其绑定关系:

  • Queue:用于声明队列,可以用工厂类QueueBuilder构建

  • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建

  • Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

    image-20250418183300525

6.2 案例实现

6.2.1 @Bean实现
  • 创建配置类,在配置类中添加相关配置

    package com.leon.rabbitmq.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
    
  • ClassName:RabbitMQConfig

  • Package:com.leon.rabbitmq.config

  • Description:

  •  用于配置队列和交换机,以及绑定队列
    
  • @Author: leon

  • @Version: 1.0
    */
    @Configuration
    public class RabbitMQConfig {

    // 创建一个fanout交换机,名称为java.fanout
    @Bean
    public FanoutExchange fanoutExchange() {

      // 方式一
      //return new ExchangeBuilder("java.fanout","fanout").build();
      // 方式二 durable 是否持久化,交换机不存储数据,所以不需要持久化
      //return ExchangeBuilder.fanoutExchange("java.fanout").durable(false).build();
      // 方式三
      return  new FanoutExchange("java.fanout");
    

    }

    // 创建一个queue
    @Bean
    public Queue queue1(){

      // 方式一
      //return new Queue("java.queue1");
      // 方式二
      return QueueBuilder.durable("java.queue1").build();
    

    }

    // 创建一个queue
    @Bean
    public Queue queue2(){

      // 方式一
      //return new Queue("java.queue2");
      // 方式二
      return QueueBuilder.durable("java.queue2").build();
    

    }
    // 绑定交换机
    @Bean
    public Binding binding1(Queue queue1,FanoutExchange fanoutExchange){

      // 方式一
      return BindingBuilder.bind(queue1).to(fanoutExchange);
    

    }
    // 绑定交换机
    @Bean
    public Binding binding2(Queue queue2,FanoutExchange fanoutExchange){

      // 方式一
      // 方式一
      return BindingBuilder.bind(queue2).to(fanoutExchange);
    

    }
    }

  • 代码实现

  • 发布者代码

    @Test
    public void javaNewQueueAndExchange(){
        // 设置发送消息
        String message = " Hello java rabbitmq ---- java.fanout ";
        // 设置交换机
        String exchangeName = "java.fanout";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName,"",message);
    }
    
  • 消费者代码

    // 获取通化代码创建的fanout交换机的消息
    @RabbitListener(queues = {"java.queue1"})
    public void readJavaQueue1(String message){
        System.out.println("消费者 1 消费类topic.queue1队列中的消息 ====>" + message);
    }
    
    @RabbitListener(queues = {"java.queue2"})
    public void readJavaQueue2(String message){
        System.out.println("消费者 2 消费类topic.queue2队列中的消息 ====>" + message);
    }
    
6.2.2 注解实现
  • 消费者代码
@RabbitListener(bindings = {@QueueBinding(
        // 创建队列,名称为annotation.queue
        value= @Queue(name = "annotation.queue"),exchange = @Exchange(name = "annotation.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"})} )
public void readAnnotationQueue(String message){
    System.out.println("消费者---->消费annotation.direct队列中的消息 ====> " + message);
}
  • 发布者代码
@Test
public void javaAnnotationAndExchange(){
    // 设置发送消息
    String message = " Hello java rabbitmq ---- annotation.fanout ";
    // 设置交换机
    String exchangeName = "annotation.direct";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName,"red",message);

    message = "annotation.direct";

    rabbitTemplate.convertAndSend(exchangeName,"blue",message);


}
  • 声明队列、交换机、绑定关系的Bean是什么?
  • Queue
  • FanoutExchange、DirectExchange、TopicExchange
  • Binding
  • 基于注解@RabbitListener注解声明队列和交换机有哪些常见注解
  • @Queue
  • @Exchange

7.消息转换器

7.1 案例演示

  • 需求:测试利用SpringAMQP发送对象类型的消息

  • consumer中编写MessageConfig声明一个队列,名为object.queue

  • 编写单元测试,向队列中直接发送一条消息,消息类型为Map

  • 在控制台查看消息,总结你能发现的问题

  • 实现代码

  • 发布者代码

    @Test
    public void testObjectQueue(){
        // 设置队列名称
        String queueName = "object.queue";
        // 创建数据
        Map<String, Object> map = new HashMap<>();
    
        map.put("age",18);
    
        map.put("name","张三");
        // 发送消息
        rabbitTemplate.convertAndSend(queueName,map);
    
    }
    
  • 消费者代码

    @RabbitListener(queues = {"object.queue"})
    public void readObjectQueue(Map<String,Object> message){
        System.out.println("读取object.queue队列的消息 =====>" + message);
    }
    
  • 总结

  • Spring对消息对象的处理是由org.springframework.amqp.support.converter.MessageCpnverter来处理的,默认是实现是SimpleMessageConverter,是基于JDK中的ObjectOutputStream来实现的序列化的

    • JDK序列化有安全风险
    • JDK序列化可读性很差
    • JDK序列化消息量太大
  • 使用JSON序列化代替默认序列化

  • 导入依赖包

    <!-- 导入消息转换器器-->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
    
  • 在发布者和消费者的配置类中配置转换器

    package com.leon.rabbitmq.config;
    
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * ClassName:JsonMessageConverter
     * Package:com.leon.rabbitmq.config
     * Description:
     *
     * @Author: leon
     * @Version: 1.0
     */
    @Configuration
    public class JsonMessageConverter {
    
        @Bean
        public MessageConverter messageConverter(){
            // 创建消息转换对象
            Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
            // 开启id标识
            converter.setCreateMessageIds(true);
            return converter ;
        }
    
    }
    

六、生产者的可靠性

1. 生产者重试

  • 有的时候由于网路波动,可能会出现客户端连接MQ失败的情况。通过配置可以开启连接失败后的重试机制

  • 配置

    spring:
    rabbitmq:
      #设置超时时间
      connection-timeout: 1
      template:
        retry:
          # 开启超时重试机制
          enabled: true
          # 失败后的初始等待时间
          initial-interval: 1000ms
          # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
          multiplier: 1
          # 最大重试次数,默认是3次
          max-attempts: 3
    
  • 发布者测试代码

    package com.leon.rabbitmq.publisher;
    
    import org.junit.jupiter.api.Test;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    /**
    
  • ClassName:PublisherApplicationTest2

  • Package:com.leon.rabbitmq.publisher

  • Description:
    *

  • @Author: leon

  • @Version: 1.0
    */
    @SpringBootTest
    public class PublisherApplicationTest2 {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testPublisherRetry() {

      // 设置发送消息
      String message = "Hello World";
      // 发送消息
      rabbitTemplate.convertAndSend("simple.queue", message);
    

    }

    }

  • 注意

  • 当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程式被阻塞的,会影响业务性能。

  • 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

2. 生产者确认

2.1 生产者确认

  • RabbitMQ有Publisher Confirm和Publisher Return两种确认机制,开启确认机制后,在MQ成功收到消息后会返回确认消息给生成者。返回的结果有以下几种可能:

  • 消息投递到了MQ,但是路由失败。此时会通过Publisher Return返回路由异常原因,然后返回ACK,告知投递成功

  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功

  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功

  • 其它情况都会返回NACK,告知投递失败

2.2 Publisher Return

  • 在publisher配置application.yml配置文件中配置

    spring:
    rabbitmq:
      # 开启消息发送失败返回通知
      publisher-returns: true
      # 消息发送确认模式
      # none 关闭confirm机制 simple 同步阻塞等待MQ的回执消息 correlated MQ异步回调方式返回回执消息
      publisher-confirm-type: correlated
    
  • 配置说明

    • 这里的publsher-confirm-type有三种模式可以设置
    • none:关闭confirm机制
    • simple:同步阻塞等待MQ的回执消息
    • correlated:MQ异步回调方式返回回执消息
  • 创建一个配置类,在配置类种对RabbitTemplate类进行设置

    package com.leon.rabbitmq.publisher.config;
    
    import org.springframework.amqp.core.ReturnedMessage;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.Configuration;
    
    import javax.annotation.PostConstruct;
    
    /**
    
  • ClassName:PublisherReutrn

  • Package:com.leon.rabbitmq.publisher.config

  • Description:
    *

  • @Author: leon

  • @Version: 1.0
    */
    @Configuration
    public class PublisherReturn {

    @PostConstruct
    public void rabbitTemplate(RabbitTemplate rabbitTemplate){

      // 设置返回通知
      rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){
    
          @Override
          public void returnedMessage(ReturnedMessage returnedMessage) {
    
              System.out.println("一般情况下只有路由出现问题时才会返回=======");
    
              System.out.println("发送消息 = "+returnedMessage.getMessage());
              System.out.println("交换机 = "+returnedMessage.getExchange());
              System.out.println("路由键 = "+returnedMessage.getRoutingKey());
              System.out.println("ReplyTest = "+returnedMessage.getReplyText());
              System.out.println("ReplyCode = "+returnedMessage.getReplyCode());
    
    
          }
      });
    

    }

    }

  • 代码实现

    @SpringBootTest
    public class PublisherApplicationTest2 {
    
      @Autowired
      private RabbitTemplate rabbitTemplate;
    
      @Test
      public void testPublisherReturn(){
          System.out.println(rabbitTemplate);
    
          rabbitTemplate.convertAndSend("leon.direct","1111","hello");
    
      }
    }
    

2.3 Publisher Confirm

@SpringBootTest
public class PublisherApplicationTest2 {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testPublisherConfirm(){
        // 创建CorrelationData对象
        CorrelationData correlationData = new CorrelationData();
        // 设置异步确认回调
        correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            // 这个方法的执行契机是,MQ返回信息正常,但是消费方在接收MQ的消息失败时,才会触发执行,这个几乎不可能触发
            @Override
            public void onFailure(Throwable ex) {
                System.out.println(" 发送消息失败:执行了onFailure");
            }
            // 这个方法时等待MQ返回信息之后,执行
            @Override
            public void onSuccess(CorrelationData.Confirm result) {

                if(result.isAck()){
                    System.out.println("发送消息成功:onSuccess 返回ack");
                }else {
                    System.out.println("发送消息失败:onSuccess 返回nack,失败原因:" + result.getReason());
                }

            }
        });

        rabbitTemplate.convertAndSend("leon.direct","red","hello word",correlationData);

        try {
            // 设置睡眠,等待异步回调结果
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

    }

3.4 总结

3.4.1 Spring AMQP中生产者消息确认的几种返回情况
  • 消息投递到了MQ,但是路由失败。会return路由异常原因,返回ACK。
  • 临时消息投递到了MQ,入队成功,返回ACK。
  • 持久消息投递到了MQ,入队成功,返回ACK。
  • 其它原因返回NACK,告知投递失败
3.4.2 如何处理生产者的确认消息?
  • 生产者确认需要额外的网络和系统资源开销,尽量不要使用。
  • 如果一定要使用,无需开启publisher return机制,因为一般路由失败是程序员自身的业务问题
  • 对于NACK消息可以有限次数重试,依然失败则记录异常异常信息
  • 在异步回调情况下,要注意,需要等待异步结果执行完再结束主程序,否则异步回调还没有执行完,就关闭了,因而会出现不会执行publisher Return配置的方法以及Pulisher confirm返回的一直是NACK。

七、MQ的可靠性

1. MQ的可靠性

  • 在默认情况下,RabbitMQ会将接收到的信息保存在内存中降低消息收发的延迟。这样会导致两个问题:

  • 一旦宕机,内存中的消息会丢失

  • 内存空间有限,当消费者或处理过慢时,会导致消息积压,引发MQ阻塞

    image-20250420205621457

2. 数据持久性

  • 持久化可以分为三个方面进行持久化
  • 交换机持久化
  • 队列持久化
  • 消息持久化

2.1 交换机持久化

image-20250420210942901

  • 说明
  • 如果没有进行持久化交换机,在重启RabbitMQ则会被删除,也就是再次重启时没有设置持久化的交换机将不存在了。

2.2 队列持久化

image-20250420211323005

  • 说明
  • 如果没有持久化队列,即再次重启RabbitMQ时,没有持久化的队列将不存在了,也就是被删除了。可以理解为未被保存在磁盘中,而是被保存在内存中

2.3 消息持久化

image-20250420212138642

  • 说明
  • 如果没有将队列的消息进行持久化,那么等下次重启RabbbitMQ则未进行持久化的消息将会丢失。
  • 注意
    • 即使进行了持久化,如果消息被消费了,该持久化的消息也会被删除,也可以理解为并非真正的删除,而是在磁盘中暂存,等消费的时候删除。
2.4 说明
  • 持久性的消息:是在没有消费的情况下存在MQ,消费了的话也被删除
  • 在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。
  • 不过处于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化,一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。

3. LazyQueue

3.1 LazyQueue
  • 从RabbitMQ的3.6版本开始,就增加了Lazy Queue的概念,也就是惰性队列。
  • 惰性队列的特征如下:
  • 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)
  • 消费者要消费消息时才会从磁盘读取并加载到内存
  • 支持数百万条消息存储
  • 在3.12版本后,所有队列都是Lazy Queue默认,无法更改。
3.2 如何设置一个惰性队列
  • 要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可:

    image-20250420215539440

    image-20250420215602903

3.3 案例演示
  • 不使用Lazy Queue的情况

  • 关闭消息确认

    spring:
      rabbitmq:
        # 开启消息发送失败返回通知
        publisher-returns: false
        # 消息发送确认模式
        # none 关闭confirm机制 simple 同步阻塞等待MQ的回执消息 correlated MQ异步回调方式返回回执消息
        publisher-confirm-type: none
    
  • 发布者代码

    @SpringBootTest
    public class PublisherApplicationTest2 {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void notLazyQueue(){
            // 创建一个消息对象
            Message message = MessageBuilder.
                    // 设置消息体,也就是设置消息,并设置字符集编码
                    withBody("hello not lazy queue".getBytes(StandardCharsets.UTF_8))
                    // 设置消息是否持久化
                    // 临时消息:在内存中存储,会到达内存上限,然后会出现pageOut现象(内存数据转移到磁盘)
                    .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();
            // 循环
            for (int i = 1; i <= 1000000; i++) {
                // 发送消息
                rabbitTemplate.convertAndSend("simple.queue", message);
            }
    
    
    
        }
    
    • RabbitMQ控制台数据图

    image-20250420220700807

  • 使用Lazy Queue的情况

  • 创建Lazy Queue队列

    image-20250420221951491

  • 发布者代码

    @SpringBootTest
    public class PublisherApplicationTest2 {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void lazyQueue(){
            // 创建一个消息对象
            Message message = MessageBuilder.
                    // 设置消息体,也就是设置消息,并设置字符集编码
                            withBody("hello not lazy queue".getBytes(StandardCharsets.UTF_8))
                    // 设置消息是否持久化
                    // 临时消息:在内存中存储,会到达内存上限,然后会出现pageOut现象(内存数据转移到磁盘)
                    // 如果发送消息到惰性队列,那么消息的持久化属性会被忽略,然后不会因为pageOut而拒绝服务(不发消息)
                    .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();
    
            // 循环
            for (int i = 1; i <= 1000000; i++) {
                // 发送消息
                rabbitTemplate.convertAndSend("lazy.queue", message);
            }
    
    
    
        }
    
  • RabbitMQ控制台数据情况

    image-20250420221816442

  • 也可以通过配置类创建Lazy Queue

    package com.leon.rabbitmq.publisher.config;
    
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.QueueBuilder;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * ClassName:RabbitMQConfig
     * Package:com.leon.rabbitmq.publisher.config
     * Description:
     *
     * @Author: leon
     * @Version: 1.0
     */
    @Configuration
    public class RabbitMQConfig {
    
        @Bean
        public Queue queue(){
            return QueueBuilder
                    // 设置队列名称
                    .durable("lazy.queue")
                    // 设置成Lazy队列
                    .lazy()
                    // 创建
                    .build();
        }
    }
    
  • 也可以通过注解设置Lazy队列

    // 设置惰性队列
    @RabbitListener(queuesToDeclare ={
            // 设置队列
            @Queue(value = "lazy.queue",
                    // 是否持久化
                    durable = "true",
                    // 参数设置 
                    arguments = @Argument(name = "x-queue-mode",value = "lazy"))} )
    public void readLazyQueue(String message){
        System.out.println("接收到来之Lazy.queue队列的消息: ====>" + message);
    }
    
  • RabbitMQ如何保证消息的可靠性

    • 首先通过配置可以让交换机、队列、以及发送消息都持久化。这样队列中的消息会持久化到磁盘,MQ重启消息依然存在。
    • RabbitMQ在3.6版本引入了LazyQueue,并且在3.12版本后会称为队列的默认模式,LazyQueue会将所有消息都持久化。
    • 开启持久化和生产者确认时,RabbitMQ只有在消息持久化完成后才会给生产者返回ACK回执。

八、消费者的可靠性

1. 消费者确认机制

1.1 消费者确认机制

  • 为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledement)。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息。

  • nack:消息处理失败,RabbitMQ需要再次投递消息。

  • reject:消息处理失败并不拒绝该消息,RabbitMQ从队列中删除该消息

    image-20250420230202973

  • Spring AMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除,非常不安全,不建议使用。

  • manual:手动模式。需要自己在业务代码中调用API,发送ack或reject,存在业务入侵,但更灵活。

  • auto:自动模式。SpringAMQP利用AOP对消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack,当业务出现异常时,根据异常判断返回不同结果:

    • 如果是业务异常,会自动返回nack
    • 如果是消息处理或校验异常,自动返回reject
  • 对消费者配置文件application.yml进行配置

    spring:
    rabbitmq:
      listener:
        # 简单模式
        simple:
          # 设置每次重消息队列中获取的消息数量为1,消费完之后再取
          prefetch: 1
          # 开启消费者确认机制
          # none 关闭 manual 手动模式 auto 自动模式
          acknowledge-mode: none
    
  • 案例演示

  • none —-> 抛出MessageConversionException消息转换异常,被reject

    @RabbitListener(queues = "confirm.queue")
    public void readConsumerConfirm(String message){
        System.out.println("消费了confirm.queue中的消息 ====> " + message);
    
        // 抛出消息转换异常,返回reject,消息会被删除
        throw new MessageConversionException("消息格式转换异常");
    
        // 抛出业务异常,返回nack,消息会被重新入队,前提是非none模式
        //throw new RuntimeException("业务异常");
    
    }
    
  • auto —-> 抛出业务异常;消息将被保留在队列

    @RabbitListener(queues = "confirm.queue")
    public void readConsumerConfirm(String message){
        System.out.println("消费了confirm.queue中的消息 ====> " + message);
    
        // 抛出消息转换异常,返回reject,消息会被删除
        //throw new MessageConversionException("消息格式转换异常");
    
        // 抛出业务异常,返回nack,消息会被重新入队,前提是非none模式
        throw new RuntimeException("业务异常");
    
    }
    
    • 控制台效果图

    image-20250420233113860

2. 失败重试机制

2.1 失败重试机制

  • 当消费者出现异常后,消息会不断requeue(重入队)到队列,重新发送给消费者。如果消费者再次执行依然出错,消费者会再次requeue到队列,再次投递,直到消息处理成功为止。会引起MQ服务不必要压力。

  • 为了应对不断重新入队的情况,Spring又提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到MQ队列

  • 基本配置

    spring:
    rabbitmq:
      listener:
        # 简单模式
        simple:
          retry:
            # 开启超时重试机制
            enabled: true
            # 失败后的初始等待时间
            initial-interval: 1000ms
            # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
            multiplier: 1
            # 最大重试次数,默认是3次
            max-attempts: 3
    
  • 案例演示

  • 发布者代码

    @SpringBootTest
    public class PublisherApplicationTest2 {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testConfirmQueue(){
    
            rabbitTemplate.convertAndSend("confirm.queue","hello");
    
        }
    
  • 消费者代码

    @RabbitListener(queues = "confirm.queue")
    public void readConsumerConfirm(String message){
        System.out.println("消费了confirm.queue中的消息 ====> " + message);
    
        // 抛出消息转换异常,返回reject,消息会被删除
        //throw new MessageConversionException("消息格式转换异常");
    
        // 抛出业务异常,返回nack,消息会被重新入队,前提是非none模式
        throw new RuntimeException("业务异常");
    
    }
    

3. 消费失败处理

3.1 失败消息处理策略

  • 在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要MessageRecoverer接口来处理,它包含三种不同的实现

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

    image-20250421163236460

3.2 案例实现

  • 将失败消息策略改为RepublishMessageRecoverer

  • 首先创建配置类,然后创建交换机、消息队列并绑定交换机

    package com.leon.rabbitmq.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.retry.MessageRecoverer;
    import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * ClassName:ErrorMessageConfig
     * Package:com.leon.rabbitmq.config
     * Description:
     *
     * @Author: leon
     * @Version: 1.0
     */
    @Configuration
    // 需要在开启超时重试机制后,才加载该配置类
    @ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled",havingValue = "true")
    public class ErrorMessageConfig {
    
        // 创建一个error.queue消息队列
        @Bean
        public Queue errorQueue(){
            return new Queue("error.queue",true);
        }
    
        // 创建一个error.direct的定向交换机
        @Bean
        public DirectExchange errorExchange(){
            return new DirectExchange("error.direct");
        }
    
        // error.direct交换机于error.queue进行绑定
        @Bean
        public Binding errorBinding(DirectExchange errorExchange,Queue errorQueue){
    
            return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
    
        }
    
        // 声明失败消息处理
        @Bean
        public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
            return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
        }
    }
    
  • RabbitMQ控制台结果

    image-20250421165624027

3.3 总结

  • 消费者如何保证消息一定被消费?
  • 开启消费者确认机制为auto,由Spring确认消息处理成功后返回ack,异常时返回nack
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理。

4. 业务幂等性

4.1业务幂等性

  • 幂等是一个属性概念,用函数表达来描述是这样的:f(x)=f(f(x))。再程序开发中,则是指同一个业务,执行一次或多次对业务状态度的影响是一致的。
  • 幂等
    • 查询业务,例如根据id查询商品
    • 删除业务,例如根据id删除商品
  • 非幂等
    • 用户下单业务,需要扣减库存
    • 用户退款业务,需要恢复余额

4.2 解决方案

  • 唯一消息id

  • 方案一,是给每一个消息都设置一个唯一id,利用id区分是否重复消息:

    • 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
    • 消费者接收到消息后处理自己的业务,业务处理成后将消息ID保存到数据库
    • 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
  • 注意

    • 如果消息量后面越来越大:1000W—->2000W?
    • 使用数据迁移到历史表
    • 删除数据,数据因为具有时效性,过了失效性则进行删除,减少积压
  • 业务判断

  • 方案二

    • 是结合业务逻辑,基于业务本身判断。以支付业务为例:需要在支付后修改订单状态为已支付,应该在修改订单状态前先查询订单状态,判断是否是未支付。只有未支付订单才需要修改,其它状态不做处理。

    image-20250421171245234

  • 最后的方案(兜底方案)

  • 利用各种机制尽可能增加消息的可靠性,但也不好说能保证消息100%的可靠。万一真的MQ通知失败该怎么办呢?

    • 主动查询

    image-20250421171511542

4.3 总结

  • 如何保证支付服务与交易服务之间的订单状态一致性
  • 首先,支付服务会正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步
  • 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递和处理的可靠性。同时也开启了MQ的持久化,避免因服务宕机导致消息丢失。
  • 最后,我们还在交易服务更新订单状态时做了业务幂等判断,避免因消息重复消费导致订单状态异常
  • 如果交易服务消息处理失败,有没有什么兜底方案
  • 我们可以在交易服务设置定时任务,定期查询订单支付状态。这样即使MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。

十、延迟消息

  • 延迟消息:
  • 生产者发送消息时指定一个时间,消费者不会立刻接收到消息,而是在指定时间后才收到消息
  • 延迟任务:
  • 设置在一定时间之后才执行的任务

image-20250421172632146

1. 死信交换机

1.1 死信交换机

  • 当一个队列中的消息满足下列情况下之一时,就会成为死信(dead letter)

  • 消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false

  • 消息是一个过期消息(达到了队列或者消息本身设置过期时间),超时无人消费

  • 要投递的队列消息堆积满了,最早的消息可能成为死信

  • 如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信会投递到这个交换机中。这个交换机称之为死信交换机(Dead Letter Exchange,简称DLX)

    image-20250421173334927

1.2 案例实现

  • 实现如图效果,且Routing Key为空

  • 需要在simple.queue2创建时指定死信交换机

    image-20250421173604579

    image-20250421175130839

  • 代码实现

  • 发布者代码

    @SpringBootTest
    public class PublisherApplicationTest2 {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testDlxDirect(){
    
            System.out.println("发送时间: " + LocalTime.now());
    
            rabbitTemplate.convertAndSend("simple.direct", "", "hello dlx direct", new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    // 设置过期时间
                    message.getMessageProperties().setExpiration("5000");
                    return message;
                }
            });
    
        }
    
  • 消费者代码

    @RabbitListener(queues = {"dlx.queue"})
    public void readDlxQueue(String message)  {
        System.out.println("消费了dlx.queue中的消息 ====> " + message);
        System.out.println("现在的时间 ======= "+ LocalTime.now());
    }
    

2. 延迟消息插件

  • 官方文档说明:https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq

2.1 下载

  • 下载网址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/tags

image-20250421180649169

image-20250421180629422

2.2 安装

  • 因为是使用Docker安装的RabbitMQ,通过指令查询插件目录挂载在哪个目录下

    docker volume inspect 卷名称
    
  • 然后将下载的插件放入到指定的目录中

  • 进入容器中,然后运行指令

    docker exec -it rabbitmq bash
    rabbitmq-plugins list 
    
  • 查看有哪些插件

    image-20250421184909579

  • 然后启用插件(外部和内部启用)

    // 直接再外部启用插件
    docker exec -it 容器名称 rabbitmq-plugins enable 插件名称
    // 进入容器启用
    rabbitmq-plugins enable 插件名称
    
  • 然后成功

    image-20250421185023045

  • 再重启镜像

    docker restart 镜像名称或ID
    

2.3 案例演示

  • 使用注解实现延迟消息

    // 延迟消息
      @RabbitListener(bindings = {@QueueBinding(
              // 设置队列
              value = @Queue(value = "delay.queue",durable = "true"),
              // 设置交换机
              exchange = @Exchange(value = "delay.direct",type = ExchangeTypes.DIRECT,delayed = "true"),
              // 设置路由键
              key = "delay")})
      public void readDelayExchange(String message){
          System.out.println("消费了delay.queue中的消息 ====>"+message+ " 时间为: ==== " + LocalTime.now());
      }
    
  • 使用配置类实现延迟消息

package com.leon.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * ClassName:DelayExchangeConfig
 * Package:com.leon.rabbitmq.config
 * Description:
 *
 * @Author: leon
 * @Version: 1.0
 */
@Configuration
public class DelayExchangeConfig {

    // 设置队列
    @Bean
    public Queue delayQueue() {

        return new Queue("delay.queue",true);

    }

    // 创建延时交换机
    @Bean
    public DirectExchange delayExchange() {
        // 创建交换机
        DirectExchange directExchange = new DirectExchange("delay.direct",true,false);
        // 设置为延时交换机
        directExchange.setDelayed(true);

        return directExchange;
    }

    // 绑定交换机和消息队列
    @Bean
    public Binding delayBinding(DirectExchange delayExchange, Queue delayQueue) {

        return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay");

    }
}
  • 发布者代码

    @SpringBootTest
    public class PublisherApplicationTest2 {
    
      @Autowired
      private RabbitTemplate rabbitTemplate;
    
      @Test
      public void testDelayDirect(){
          // 发送消息
          rabbitTemplate.convertAndSend("delay.direct", "delay", "hello delay direct", new MessagePostProcessor() {
              @Override
              public Message postProcessMessage(Message message) throws AmqpException {
                  // 设置延时时间
                  message.getMessageProperties().setDelay(5000);
                  return message;
              }
          });
    
    
      }
    

Comment