精通Spring Boot 3 : 10. 使用 Spring Boot 进行消息通信 (4)

基于 Spring Boot 的 AMQP 方案

早期的消息传递协议,如 Sun Microsystems、Oracle、IBM 和 Microsoft 开发的协议,都是专有的,这使得不同技术或编程语言的混合变得困难。

针对这一挑战,摩根大通的团队开发了高级消息队列协议(AMQP)。AMQP 是一种开放标准的应用层协议,专为面向消息的中间件(MOM)设计。AMQP 是一种底层协议,这意味着应用程序可以使用任何技术或编程语言与兼容 AMQP 的消息代理进行通信。

目前有很多不同的 AMQP 消息代理,但 RabbitMQ 是最受欢迎的选择之一。RabbitMQ 使用简单、易于扩展且速度非常快,因此我们将在本节中使用它。

安装 RabbitMQ 消息队列

在我们讨论 RabbitMQ 之前,您可以选择安装它(这不是必须的),尽管我们将使用 Docker Compose 来进行实验。如果您使用的是 macOS 或 Linux,可以使用 brew 命令:

$ brew upgrade
$ brew install rabbitmq

如果您使用的是其他 UNIX 系统或 Windows 系统,可以访问 RabbitMQ 网站并下载安装程序(
https://www.rabbitmq.com/docs/download)。RabbitMQ 是用 Erlang 编写的,因此主要依赖于在您的系统中安装 Erlang 运行时。现在,所有 RabbitMQ 安装程序都包含了所需的 Erlang 依赖项。请确保可执行文件在您的 PATH 变量中(具体取决于您使用的操作系统是 Windows 还是 Linux)。如果您使用 brew,则无需担心设置 PATH 变量。

RabbitMQ/AMQP:交换、绑定与队列

AMQP 定义了三个概念,这些概念与 JMS 世界略有不同,但非常容易理解。AMQP 定义了交换机,消息会发送到这些交换机。每个交换机接收一条消息,并将其路由到零个或多个队列。这个路由过程基于交换机的类型和称为绑定的规则,使用特定的算法进行处理。

AMPQ 定义了五种交换类型(除了默认类型):直接交换、扇出交换、主题交换、一致性哈希交换和头部交换。图 10-7 展示了这些不同的交换类型。


因此,AMPQ 的核心思想是将消息发送到交换机,并附带一个路由键。然后,交换机会根据其类型将消息投递到队列(如果路由键不匹配,则不会投递)。

默认交换会自动绑定到每个创建的队列。直接交换通过路由键与队列绑定;你可以将这种交换类型视为一对一的绑定。主题交换与直接交换类似;唯一的区别在于在绑定中可以在路由键中添加通配符(你可以使用 * -星号-,它恰好匹配路由键中的一个单词,或者使用 # -井号-,它可以匹配路由键中的零个或多个单词,或其他任意组合)。头交换与主题交换类似;唯一的区别在于绑定是基于消息头(这是一个非常强大的交换方式,你可以对其头部进行各种表达)。 扇出交换机会将消息复制到所有绑定的队列,可以将其视为消息广播。一致性哈希交换机是一种特殊的交换机类型,使用一致性哈希算法将消息分发到多个队列。这确保了相同路由键的消息始终发送到同一队列,即使在添加或删除队列的情况下也不会改变。这使得在特定组(由路由键定义)内保持消息顺序的同时,能够有效地扩展消费者。

您可以在
https://rabbitmq.com/tutorials/amqp-concepts.html 上找到关于这些概念和交换类型的更多信息。

在用户应用中使用 RabbitMQ 的 AMQP 协议

现在是时候在用户应用中利用 RabbitMQ 的强大功能了。您可以从 10-messaging-rmq/users 文件夹下载源代码,或者如果您使用 Spring Initializr(https://start.spring.io),请确保选择 RabbitMQ、Web、JPA、验证、Docker Compose、Lombok、H2 和 PostgreSQL 作为依赖项。将组字段设置为 com.apress,将工件和名称字段设置为 users,生成项目后下载,解压缩并导入到您喜欢的 IDE 中。

打开 build.gradle 文件,内容应类似于清单 10-17。

plugins {
    id 'java'
    id 'org.springframework.boot' version '3.2.3'
    id 'io.spring.dependency-management' version '1.1.4'
}
group = 'com.apress'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '17'
repositories {
    mavenCentral()
}
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.boot:spring-boot-starter-validation'
    implementation 'org.springframework.boot:spring-boot-starter-amqp'
    implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
    developmentOnly 'org.springframework.boot:spring-boot-docker-compose'
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    runtimeOnly 'com.h2database:h2'
    runtimeOnly 'org.postgresql:postgresql'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    // Web
    implementation 'org.webjars:bootstrap:5.2.3'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
}
tasks.named('test') {
    useJUnitPlatform()
}

10-17 build.gradle

这里的重要依赖是 spring-boot-amqp 和 jackson-datatype-jsr310,它们将帮助我们处理 LocalDateTime 的序列化与反序列化。

当 Spring Boot 启动时,它会识别您已添加 AMQP 依赖,并自动配置所有必要的默认 Spring Bean,以便帮助您连接到 RabbitMQ 代理。

接下来,打开或创建事件包,以及我们将发送到 RabbitMQ 的 UserActivatedEvent 和 UserRemovedEvent 类。它们与前面章节中的相应类类似,但我想向您展示更多的选择;这些类的配置方式将取决于您的业务逻辑。请参见清单 10-18 和 10-19。

package com.apress.users.events;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Data
public class UserActivatedEvent {
    private final String action = "ACTIVATION_STATUS";
    private String email;
    private boolean active;
}

10-18 src/main/java/apress/com/users/events/UserActivatedEvent.java

package com.apress.users.events;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
@AllArgsConstructor
@NoArgsConstructor
@Data
public class UserRemovedEvent {
    private final String action = "REMOVED";
    private String email;
    @JsonFormat(shape=JsonFormat.Shape.STRING,pattern = "yyyy-MM-dd HH:mm:ss")
    private LocalDateTime removed;
}

10-19 src/main/java/apress/com/users/events/UserRemovedEvent.java

正如你所看到的,我们现在在两个类中将动作作为常量添加,这是一种简单的方式来避免根据名称来查找动作,和之前的做法不同。

向 RabbitMQ 发送消息

让我们来看看如何将消息发布到 RabbitMQ。请记住,发布者需要以下步骤:打开与 RabbitMQ 代理的连接,创建一个通道,并将消息发送到交换机。幸运的是,使用 spring-amqp 和 Spring Boot,这些步骤变得更加简单。

接下来,打开或创建 UserLogs 类。请参阅列表 10-20。

package com.apress.users.events;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import static com.apress.users.amqp.UserRabbitConfiguration.USERS_ACTIVATED;
import static com.apress.users.amqp.UserRabbitConfiguration.USERS_REMOVED;
@Slf4j
@AllArgsConstructor
@Component
public class UserLogs {
    private RabbitTemplate rabbitTemplate;
    @Async
    @EventListener
    void userActiveStatusEventHandler(UserActivatedEvent event){
        this.rabbitTemplate.convertAndSend(USERS_ACTIVATED,event);
        log.info("User {} active status: {}",event.getEmail(),event.isActive());
    }
    @Async
    @EventListener
    void userDeletedEventHandler(UserRemovedEvent event){
        this.rabbitTemplate.convertAndSend(USERS_REMOVED,event);
        log.info("User {} DELETED at {}",event.getEmail(),event.getRemoved());
    }
}

10-20 src/main/java/apress/com/users/events/UserLogs.java

列表 10-20 显示,UserLogs 类与前一节中的配置相似(见列表 10-7),但在这种情况下我们使用 RabbitTemplate 类。该类实现了模板设计模式,简化了连接代理、会话管理、消息转换(基于 Spring Boot 自动配置的 SimpleMessageConverter)、错误时的重新连接、重试、连接池、通道等的繁琐代码。

在这个例子中,RabbitTemplate 使用了 convertAndSend 方法,路由键作为第一个参数(可以是 users.activated 或 users.removed),事件作为第二个参数。这个方法有多种重载,您可以选择最适合您业务逻辑的版本。

我之前提到过,发布者总是连接到一个交换机,而在这个类中指定了一个交换机,但它将在 RestTemplate 的配置中进行声明,这样使用 convertAndSend 方法会更方便。

接下来,打开或创建 UserRabbitConfiguration 类。请参阅清单 10-21。

package com.apress.users.amqp;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class UserRabbitConfiguration {
    public static final String USERS_EXCHANGE = "USERS";
    public static final String USERS_STATUS_QUEUE = "USER_STATUS";
    public static final String USERS_REMOVED_QUEUE = "USER_REMOVED";
    public static final String USERS_ACTIVATED = "users.activated";
    public static final String USERS_REMOVED = "users.removed";
    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setExchange("USERS");
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter(new ObjectMapper().registerModule(new JavaTimeModule())));
        return rabbitTemplate;
    }
    @Bean
    TopicExchange exchange(){
        return new TopicExchange(USERS_EXCHANGE);
    }
    @Bean
    Queue userStatusQueue(){
        return new Queue(USERS_STATUS_QUEUE,true,false,false);
    }
    @Bean
    Queue userRemovedQueue(){
        return new Queue(USERS_REMOVED_QUEUE,true,false,false);
    }
    @Bean
    Binding userStatusBinding(){
        return new Binding(USERS_STATUS_QUEUE,Binding.DestinationType.QUEUE,USERS_EXCHANGE,USERS_ACTIVATED,null);
    }
    @Bean
    Binding userRemovedBinding(){
        return new Binding(USERS_REMOVED_QUEUE,Binding.DestinationType.QUEUE,USERS_EXCHANGE,USERS_REMOVED,null);
    }
}

10-21 src/main/java/apress/com/users/amqp/UserRabbitConfiguration.java

让我们来回顾一下 UserRabbitConfiguration 类

  • @Configuration:这个注解是一个标记,指示 Spring Boot 配置该类中声明的所有 @Bean。
  • 连接工厂:该接口从自动配置中收集所有必要的值,例如主机(RabbitMQ 代理所在的地址)、用户名、密码、虚拟主机以及任何监听器。实现该接口是 RabbitTemplate 类所必需的。
  • RabbitTemplate:这个类实现了打开连接(连接池)、通道(通道池)和重试(在与代理的连接丢失时)的所有必要逻辑,并且可以用来覆盖和设置新值。在这种情况下,我们设置了一个名为 USERS 的主题交换机。此外,我们还使用 Jackson2JsonMessageConverter 类来覆盖消息转换器。
  • Jackson2JsonMessageConverter:这个类有一个层次结构,扩展了一些抽象类并实现了 MessageConverter 接口。在这里,我们使用它将事件转换为 JSON 格式,并注册 JavaTimeModule 类模块,以处理 LocalDateTime 格式。
  • TopicExchange:这个类创建了一个持久的主题交换,允许我们创建一些可能包含正则表达式的键,并利用 RabbitMQ 默认提供的路由功能。
  • 队列:我们正在声明两个队列,USERS_STATUS 和 USERS_REMOVED。这些队列的属性设置为 durable 为 true(这意味着在崩溃或重启时,队列仍然会存在),exclusive 为 false(这意味着我们可以为这个队列使用多个消费者),autodelete 为 false(这意味着即使消费者与队列断开连接,队列仍然会保留)。
  • 绑定:如图 10-7 所示,每个交换机通过一个绑定连接到一个队列,该绑定可以根据交换机的不同而有或没有路由键。在这种情况下,我们为每个队列创建一个绑定,并分配路由键(users.activated 和 users.removed)。

需要注意的是,我们是通过编程方式创建交换机、队列和绑定的,但您也可以在 RabbitMQ 网页控制台中手动创建它们。

接下来,打开或添加 docker-compose.yaml 文件。请参见清单 10-22。

version: "3"
services:
  rabbitmq:
    container_name: rabbitmq
    hostname: rabbitmq
    image: rabbitmq:management-alpine
    restart: always
    ports:
      - "15672:15672"
      - "5672:5672"

文件 10-22docker-compose.yaml

使用 RabbitMQ 启动用户应用程序

要通过 RabbitMQ 运行用户应用程序,您可以使用命令行或您的 IDE。运行时,您将在控制台中看到以下内容:

...
Created new connection: rabbitConnectionFactory#7fff419d:0/SimpleConnection@6f5f892c [delegate=amqp://guest@127.0.0.1:5672/, localPort=63724]
UserLogs: User ximena@email.com active status: true
UserLogs: User norma@email.com active status: false
UserLogs: User dummy@email.com active status: false
UserLogs: User dummy@email.com DELETED at 2023-10-19T15:57:46.049020
...

让我们回顾一下刚才发生的事情。请打开浏览器,访问 http://localhost:15672,这是 RabbitMQ 的网页控制台。请参见图 10-8。



凭据为 guest/guest。登录后,您将看到如图 10-9 所示的 RabbitMQ 网页控制台界面。


在这里,您可以实时查看所有统计数据,了解所有传递的消息,获取消耗信息或其他重要属性,如内存、磁盘空间等。接下来,点击“交易”标签。请参见图 10-10。


默认情况下,已经预定义了多个交换。请注意,USERS Exchange(主题类型)已被声明。

接下来,点击“队列和流”选项卡。请参见图 10-11。


USER_STATUS 和 USER_REMOVED 队列都被定义为经典队列,这很好。还有一个法定队列,当你需要更高的可用性并进行集群时可以使用;但在我们目前的情况下,这并不是必需的。

接下来,点击“用户状态”队列以查看详细信息,如图 10-12 所示。


你应该已经有一些消息在这里。向下滚动页面并展开“获取消息”部分。点击“获取消息”按钮,你应该会看到类似于图 10-13 的内容。

底部的有效负载部分显示了转换为 JSON 格式的事件消息。在属性部分,请注意头字段中有一个名为__TypeId__的键,其值为
com.apress.users.event.UserActivatedEvent;这类似于 JMS 中的 TypeId,但在这里不需要显式声明,Spring AMQP 会默认生成它。

如果你查看 USER_REMOVED 队列中的消息,你应该会看到类似于图 10-14 的内容。

属性部分的标题字段和有效负载部分的消息表明我们已经具备所需的信息。

现在你可以关闭用户应用。

从 RabbitMQ 中接收消息

要从 RabbitMQ 消费消息,消费者需要按照以下步骤操作:首先打开与 RabbitMQ 代理的连接,然后创建一个通道,接着连接到一个队列,最后消费消息(可以选择自动确认或手动确认)。

我们正在以 JSON 格式处理事件,因此需要与 JMS 配置相同的设置:我们需要覆盖默认的监听器,并注册 JavaTimeModule 以便进行反序列化。

所以,让我们来创建 UserRabbitConfiguration 类的最终版本。请查看列表 10-23。

package com.apress.users.amqp;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class UserRabbitConfiguration {
    public static final String USERS_EXCHANGE = "USERS";
    public static final String USERS_STATUS_QUEUE = "USER_STATUS";
    public static final String USERS_REMOVED_QUEUE = "USER_REMOVED";
    public static final String USERS_ACTIVATED = "users.activated";
    public static final String USERS_REMOVED = "users.removed";
    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setExchange("USERS");
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter(new ObjectMapper().registerModule(new JavaTimeModule())));
        return rabbitTemplate;
    }
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter(new ObjectMapper().registerModule(new JavaTimeModule())));
        return factory;
    }
}

文件
10-23src/main/java/apress/com/users/amqp/UserRabbitConfiguration.java

列表 10-23 显示我们移除了 Exchange、Queue 和 Binding 组件;我将向您展示另一种创建它们的方法。接下来,我们声明
SimpleRabbitListenerContainerFactory(与 JMS 版本非常相似),并使用熟知的 ConnectionFactory,同时注册 JavaTimeModule。

接下来,打开或创建 UserListeners 类。请参阅清单 10-24。

package com.apress.users.amqp;
import com.apress.users.events.UserActivatedEvent;
import com.apress.users.events.UserRemovedEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import static com.apress.users.amqp.UserRabbitConfiguration.USERS_ACTIVATED;
import static com.apress.users.amqp.UserRabbitConfiguration.USERS_EXCHANGE;
import static com.apress.users.amqp.UserRabbitConfiguration.USERS_REMOVED;
import static com.apress.users.amqp.UserRabbitConfiguration.USERS_REMOVED_QUEUE;
import static com.apress.users.amqp.UserRabbitConfiguration.USERS_STATUS_QUEUE;
@Component
@Slf4j
public class UserListeners {
    @RabbitListener(
            bindings = @QueueBinding(value = @Queue(name = USERS_STATUS_QUEUE, durable = "true", autoDelete = "false")
                    ,exchange = @Exchange(name=USERS_EXCHANGE,type = "topic"),key=USERS_ACTIVATED))
    public void userStatusEventProcessing(UserActivatedEvent activatedEvent){
        log.info("[AMQP - Event] Activated Event Received: {}", activatedEvent);
    }
    @RabbitListener(
            bindings = @QueueBinding(value = @Queue(name = USERS_REMOVED_QUEUE, durable = "true", autoDelete = "false")
                    ,exchange = @Exchange(name=USERS_EXCHANGE,type="topic"),key=USERS_REMOVED))
    public void userRemovedEventProcessing(UserRemovedEvent removedEvent){
        log.info("[AMQP - Event] Activated Event Received: {}", removedEvent);
    }
}

列表 10-24 源代码:
src/main/java/apress/com/users/amqp/UserListeners.java

UserListeners 类中包含以下注解:

  • @RabbitMQListener:这个注解创建了一个 RabbitListener,除非被覆盖,否则将使用默认实现;在这种情况下,我们正在创建 SimpleRabbitListenerContainerFactory,它将使用我们自己的 MessageConverter 配置(这里是 Jackson2JsonMessageConverter)。通过这个注解,您可以定义队列、交换机和绑定(通过其他注解),使代码更加简洁易读,因为您清楚监听器是从哪里获取消息的。
  • @QueueBinding:这个注解允许您创建交换机与队列之间的绑定。其一个好处是,如果队列、交换机和绑定不存在,这个注解会尝试自动创建它们。
  • @Queue: 此注解允许您在消息代理中声明一个队列,并设置一些属性,例如持久性和自动删除等。如果该队列不存在,系统将尝试创建它。
  • 该注解在代理中声明了一个带有特定属性的交换。如果代理中不存在该交换,将会尝试创建它。

请注意,如果您出于某种原因更改了队列的某个属性(例如,第一次将 durable 设置为 true,第二次设置为 false),队列将会失败。因此,请仔细设计您将使用的交换类型、路由键以及队列的类型和属性,以避免出现任何错误。

使用 RabbitMQ 运行用户应用程序以处理消息

在使用 RabbitMQ 运行用户应用时,您将拥有一个发布者和多个消费者,并将得到以下输出:

...
User ximena@email.com active status: true
User norma@email.com active status: false
User dummy@email.com active status: false
[AMQP - Event] Activated Event Received: UserActivatedEvent(action=ACTIVATION_STATUS, email=ximena@email.com, active=true)
[AMQP - Event] Activated Event Received: UserActivatedEvent(action=ACTIVATION_STATUS, email=norma@email.com, active=false)
[AMQP - Event] Activated Event Received: UserActivatedEvent(action=ACTIVATION_STATUS, email=dummy@email.com, active=false)
User dummy@email.com DELETED at 2023-10-19T17:27:26.442656
[AMQP - Event] Activated Event Received: UserRemovedEvent(action=REMOVED, email=dummy@email.com, removed=2023-10-19T17:27:26)
...

太棒了!现在你已经知道如何使用 RabbitMQ 进行消息传递了。当然,这只是你能用它做的一小部分。继续阅读,了解更多其他功能。