精通Spring Boot 3 : 10. 使用 Spring Boot 进行消息通信 (7)
使用 Spring Boot 的 RSocket
RSocket 是最新的消息传递技术之一。这种技术是一种应用协议,支持通过 TCP、WebSocket 和流传输进行多路复用的双向通信。RSocket 具有多种交互模型:
- 请求与响应:你发送一条消息,随后会收到回复。
- 你发送一条消息,就会收到一系列回复。
- 一次性发送:你发出一条消息后,不会收到任何回复。
- 渠道:消息流在两个方向上发送。
RSocket 协议的一些关键特性包括反应式流语义、请求限流、会话恢复、大消息的分片与重组,以及保持活动帧(心跳)。在与 Spring 一起使用 RSocket 时,spring-messaging 模块提供了 RSocketRequester 和带有@MessageMapping 注解的响应者。
在本节中,我们将仅使用请求/响应交互模型来演示与我们的两个应用程序建立交互的另一种方式。让我们开始吧,看看将 RSocket 技术轻松集成到用户应用和我的复古应用中是多么简单。
使用 RSocket 和 Spring Boot 开发用户应用程序
你可以在 10-messaging-rsocket/users 文件夹中找到所有源代码。我们将使用前几章中的 WebFlux 代码。如果你是从 Spring Initializr (https://start.spring.io)开始,请将 Group 字段设置为 com.apress,将 Artifact 和 Name 字段都设置为 users,并添加 RSocket、WebFlux、Validation、R2DBC、H2 和 Lombok 依赖项。生成并下载项目后,解压缩并导入到你喜欢的 IDE 中。
让我们从 build.gradle 文件开始,它应该与清单 10-40 相似。
plugins {
id 'java'
id 'org.springframework.boot' version '3.2.3'
id 'io.spring.dependency-management' version '1.1.4'
}
group = 'com.apress'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '17'
repositories {
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-rsocket'
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
runtimeOnly 'io.r2dbc:r2dbc-h2'
runtimeOnly 'org.postgresql:r2dbc-postgresql'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
// Web
implementation 'org.webjars:bootstrap:5.2.3'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
}
tasks.named('test') {
列表 10-40 的 build.gradle 文件
在 build.gradle 文件中,我们添加了 spring-boot-starter-webflux、spring-boot-starter-rsocket 和 r2dbc-h2(数据库)依赖。
接下来,打开或创建 UserRSocket 类。请参阅清单 10-41。
package com.apress.users.rsocket;
import com.apress.users.model.User;
import com.apress.users.service.UserService;
import lombok.AllArgsConstructor;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@AllArgsConstructor
@Controller
public class UserRSocket {
UserService userService;
@MessageMapping("new-user")
Mono<User> newUser(final User user){
return this.userService.saveUpdateUser(user);
}
@MessageMapping("all-users")
Flux<User> getAllUsers(){
return this.userService.getAllUsers();
}
@MessageMapping("user-by-email")
Mono<User> findUserByEmail(final String email){
return this.userService.findUserByEmail(email);
}
@MessageMapping("remove-user-by-email")
Mono<Void> removeUserByEmail(final String email){
this.userService.removeUserByEmail(email);
return Mono.empty();
}
}
文件路径:列表 10-41 src/main/java/com/apress/users/rsocket/UserRSocket.java
UserRSocket 类中一个重要的特性是 @MessageMapping 注解,它在 RSocket 应用中充当响应者;这与你熟悉的注解(如 @GetMapping、@PostMapping 等)类似。使用这个注解时,你需要定义一个目标,并且可以接受更多参数,例如 java.security.Principal、@Header、@Payload 等。被注解的方法可以返回 STOMP over WebSocket 的值,或者使用 @SendTo 或 @SendToUser 注解的自定义定义。在这种情况下,我们使用 Mono 和 Flux 类型。
接下来,打开 application.properties 文件,内容应与列表 10-42 类似。
# RSocket
spring.rsocket.server.port=9898
# R2DBC
spring.r2dbc.properties.initialization-mode=always
spring.r2dbc.generate-unique-name=false
spring.r2dbc.name=retro-db
# Logging
logging.level.org.springframework.r2dbc=DEBUG
列表 10-42 源文件:src/main/resources/application.properties
在 application.properties 文件中,重要的属性是 spring.rsocket.server.port,对应的端口是 9898。
就这样!你不需要其他任何东西。基本上,当 Spring Boot 找到 spring-boot-starter-rsocket 依赖时,它会自动配置 RSocketServer、处理器、消息转换器等,使用默认值,并根据 @MessageMapping 注解的响应者注册所有目标。
使用 RSocket 和 Spring Boot 启动用户应用程序
现在,您可以运行用户应用程序了。您不需要做任何特别的事情。最重要的信息都在日志中。因此,一旦您运行应用程序(无论是通过命令行使用 ./gradlew bootRun 还是通过您的 IDE),请查看日志。您应该会看到类似以下的内容:
...
--- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port 8080
--- [ main] o.s.b.rsocket.netty.NettyRSocketServer : Netty RSocket started on port(s): 9898
--- [ main] com.apress.users.UsersApplication : Started UsersApplication in 2.576 seconds
请记住,这是一个 WebFlux 应用程序,因此默认情况下由 Spring Boot 配置并使用 Netty 服务器。此外,正在启动一个 NettyRSocketServer,并且它正在监听 9898 端口。
通过命令行添加用户:
curl -i -s -d '{"name":"Dummy","email":"dummy@email.com","password":"aw2s0meR!","userRole":["INFO"],"active":true}' \
-H "Content-Type: application/json" \
http://localhost:8080/users
curl -i -s -d '{"name":"Ximena","email":"ximena@email.com","password":"aw2s0meR!","userRole":["INFO","ADMIN"],"active":true}' \
-H "Content-Type: application/json" \
http://localhost:8080/users
现在你可以开始请求 RSocket 消息了。
在我的复古应用中请求 RSocket 消息
现在,让我们看看如何将 RSocket 技术集成到 My Retro App 中。如果您已经下载了代码,请查看 10-messagin-rsocket/myretro 文件夹。如果您是从 Spring Initializr (https://start.spring.io) 开始的,请将 Group 字段设置为 com.apress,将 Artifact 和 Name 字段都设置为 myretro,并添加 RSocket、WebFlux、Validation、Mongo Reactive、Docker Compose 和 Lombok 这些依赖项。生成并下载项目,解压缩后导入到您喜欢的 IDE 中。
让我们先从 build.gradle 文件开始。请查看列表 10-43。
plugins {
id 'java'
id 'org.springframework.boot' version '3.1.5'
id 'io.spring.dependency-management' version '1.1.3'
}
group = 'com.apress'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '17'
repositories {
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-rsocket'
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
implementation 'org.springframework.boot:spring-boot-starter-data-mongodb-reactive'
annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
developmentOnly 'org.springframework.boot:spring-boot-docker-compose'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
}
tasks.named('test') {
useJUnitPlatform()
}
列表 10-43 build.gradle
列表 10-43 显示我们正在使用 spring-boot-starter-rsocket 这个依赖。
接下来,打开或创建用户类和用户角色类,具体内容见清单 10-44 和 10-45。
package com.apress.myretro.rsocket;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
@NoArgsConstructor
@AllArgsConstructor
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class User {
UUID id;
private String email;
private String name;
private String gravatarUrl;
private Collection<UserRole> userRole = Collections.emptyList();
private boolean active;
}
10-44 src/main/java/com/apress/myretro/rsocket/User.java
package com.apress.myretro.rsocket;
public enum UserRole {
USER, ADMIN, INFO
}
10-45 src/main/java/com/apress/myretro/rsocket/UserRole.java
正如您所见,用户类和用户角色类与用户应用中的完全相同。
接下来,打开或创建 UserClient 接口。请参阅清单 10-46。
package com.apress.myretro.rsocket;
import org.springframework.messaging.rsocket.service.RSocketExchange;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
@Component
public interface UserClient {
@RSocketExchange("all-users")
Flux<User> getAllUsers();
}
10-46 src/main/java/com/apress/myretro/rsocket/UserClient.java
用户客户端接口通过 @RSocketExchange 注解标记一个方法,该注解定义了 RSocket 端点 all-users。在后台,得益于 Spring Boot,这个端点能够连接到 RSocket 服务器并查找该目标(响应者),在这里是用户应用程序。目前,我们只会连接到一个目标;您可以对其他任何目标执行相同的操作(@MessageMapping)。
接下来,打开或创建 Config 类。请参阅列表 10-47。
package com.apress.myretro.rsocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.service.RSocketServiceProxyFactory;
@Slf4j
@Configuration
public class Config {
@Bean
public RSocketServiceProxyFactory getRSocketServiceProxyFactory(RSocketRequester.Builder requestBuilder, @Value("${myretro.users-service.host:localhost}")String host, @Value("${myretro.users-service.port:9898}")int port) {
RSocketRequester requester = requestBuilder.tcp(host, port);
return RSocketServiceProxyFactory.builder(requester)
.build();
}
@Bean
public UserClient getClient(RSocketServiceProxyFactory factory) {
return factory.createClient(UserClient.class);
}
@Bean
CommandLineRunner commandLineRunner(UserClient userClient) {
return args -> {
userClient.getAllUsers().doOnNext(
user -> log.info("User: {}", user)
).subscribe();
};
}
}
10-47 src/main/java/com/apress/myretro/rsocket/Config.java
让我们来回顾一下 Config 类
- RSocketRequester.Builder:该接口由 Spring Boot 自动配置。它是一个 RSocket 包装器,能够发送和接收对象,并具备路由和准备其他有用元数据的能力。在此情况下,我们使用 myretro.user-service.*属性的值(以获取 RSocket 服务器的主机和端口)来创建 RSocketRequester 对象,该对象可以生成我们所需的工厂。
- RSocketServiceProxyFactory:这是一个用于创建基于 RSocket 服务接口的客户端代理的工厂,该接口与@RSocketExchange 方法(UserClient 接口)配合使用。
- 用户客户端:这个接口是一个 RSocket 服务。在后台,有一个实现负责处理与 RSocket 服务器的连接、发送和接收等所有工作。
请注意,我们正在使用 CommandLineRunner,这意味着当应用程序准备好时,它会执行代码。在这种情况下,它将使用 UserClient#getAllUsers 方法向 Users App 中的 all-users 目标发送请求。
接下来,打开如清单 10-48 所示的 application.properties 文件。
# MongoDB
#spring.data.mongodb.uri=mongodb://retroadmin:aw2s0me@127.0.0.1:27017/retrodb?directConnection=true&serverSelectionTimeoutMS=2000&authSource=admin&appName=mongosh+1.7.1
#spring.data.mongodb.database=retrodb
# App
server.port=8081
# Users Service
myretro.users-service.host=localhost
myretro.users-service.port=9898
列表 10-48 源文件:src/main/resources/application.properties
在 application.properties 文件中,我们定义了 users-service 属性,以指明 RSocket 服务器的运行位置,此处为同一台机器(localhost)和端口 9898。
最后,我们需要添加 docker-compose.yaml 文件。请记住,我们使用 spring-boot-docker-compose 依赖,这样它就可以启动 MongoDB 服务。请参见清单 10-49。
version: "3.1"
services:
mongo:
image: mongo
restart: always
environment:
MONGO_INITDB_DATABASE: retrodb
ports:
- "27017:27017"
列表 10-49 的 docker-compose.yaml 文件
在我的复古应用中使用 Spring Boot 启动 RSocket 请求者
现在是时候在我的复古应用中运行 RSocket 请求者了。您可以通过命令行或在您的 IDE 中进行。运行后,您应该会看到以下输出:
...
--- User: User(id=7d8f43c1-4911-4945-8a5d-301b365e82ce, email=dummy@email.com, name=Dummy, gravatarUrl=https://www.gravatar.com/avatar/fb651279f4712e209991e05610dfb03a?d=wavatar, userRole=[INFO], active=true)
--- User: User(id=2074203e-add0-40ca-95c2-82b2a23e1f13, email=ximena@email.com, name=Ximena, gravatarUrl=https://www.gravatar.com/avatar/f07f7e553264c9710105edebe6c465e7?d=wavatar, userRole=[INFO, ADMIN], active=true)
...
正如您所看到的,我们正在使用 RSocket 请求所有用户的目标!
其他消息传递框架
在本书的 Apress GitHub 仓库 (https://www.apress.com/gp/services/source-code) 或我的个人 GitHub 仓库 (https://github.com/felipeg48/pro-spring-boot-3rd) 中,您可以找到更多关于消息框架的示例,如 Kafka、Redis 等。要全面覆盖这些内容将需要另一本书。因此,请继续关注该仓库中的更多代码和功能。我认为,开发这类应用程序最重要的因素是,它体现了 Spring 创建应用程序的方式,因为它使您能够轻松地使用 Spring/Spring Boot 来构建任何消息解决方案,采用这种编程风格。
概要
在本章中,您学习了不同的消息传递技术,这些技术可以帮助您构建同步和异步的消息系统。您了解到 Spring 消息的核心在于 spring-messaging 模块,当与 Spring Boot 结合使用时,自动配置会设置所有默认选项,包括连接、会话和重连处理、消息转换等。
此外,您了解到 spring-messaging 模块包含接口和实现,例如在各种技术中实现的模板设计模式,提供了具体的类实现,如 JmsTemplate、RabbitTemplate、RedisTemplate 和 PulsarTemplate(以及更多),这些都帮助您专注于业务逻辑,而无需花费时间在底层技术上,从而使开发变得更加轻松。