精通Spring Boot 3 : 7. Spring Boot 响应式 (1)
在本章中,我们将讨论 Spring 框架如何利用 Project Reactor(https://projectreactor.io/)的强大功能来构建数据和 Web 反应式应用程序,以及 Spring Boot 如何通过其自动配置功能帮助我们轻松地将所有组件连接起来,从而创建出色的反应式应用程序。Spring 框架在 5.0 版本中引入了反应式技术,并在 6.x 版本中进行了进一步集成,以提供多项改进。
Spring Web Flux 和 Spring Reactive Data 是完全非阻塞的框架,基于支持反应式流背压(流控制)的 Project Reactor,并可在 Netty、Undertow 和 servlet 容器等服务器上运行。让我们一起探索反应式系统以及 Project Reactor 是如何实现这些系统的。
响应式系统
在过去十年里,软件行业为了满足移动和云计算的需求,改进了软件开发流程,生产出更稳定、更强大、更具弹性和更灵活的软件,以应对来自用户(使用桌面或网络)以及各种设备(如手机、传感器等)的现代需求。适应这些新工作负载面临许多挑战,因此一组组织共同努力,制定了一份宣言,以涵盖当今数据需求的多个方面。
反应式宣言
《反应式宣言》(https://www.reactivemanifesto.org/)于 2014 年 9 月 16 日发布,定义了反应式系统的特征。反应式系统具有灵活性、松散耦合和可扩展性。它们对故障的容忍度更高,当发生故障时,会通过应用相应的模式来处理,以避免灾难。
简而言之,反应式系统的特点如下:
- 响应式:反应系统在可能的情况下能够及时做出响应。它们专注于提供快速且稳定的响应时间,并设定可靠的上限,以确保服务质量的一致性。
- 弹性系统:反应式系统通过应用复制、隔离、封装和委托模式来实现弹性。它们通过隔离来控制故障,确保故障不会影响系统的其他部分或其他系统。恢复必须来自另一个系统,以确保高可用性(HA)。
- 弹性:反应式系统在任何工作负载下都能保持响应。它们可以通过增加或减少分配给处理这些输入的资源来应对输入速率的变化。它们不应存在任何瓶颈,这意味着它们能够对组件进行分片或复制,并将分布式输入分配给这些组件。反应式系统必须支持预测算法,以确保在普通硬件上实现成本效益的弹性。
- 消息驱动:反应式系统依赖异步消息传递来建立组件之间的边界,从而确保系统是松散耦合、相互隔离且位置透明的。反应式系统必须通过在需要时提供背压模式来支持负载管理、弹性和流量控制。通信必须是非阻塞的,以便接收者在活动时能够消耗资源,这样可以降低系统的开销。
在《反应式宣言》发布后,出现了许多不同的倡议,开始实施帮助全球开发者的框架和库。反应式流(https://www.reactive-streams.org/)是一项规范,定义了四个简单的接口:Publisher,一个提供无限数量顺序元素的提供者,根据订阅者的需求进行发布;Subscriber,用于订阅发布者;Subscription,表示订阅者与发布者之间的一对一生命周期;以及 Processor,作为订阅者和发布者的处理阶段。反应式流还有多种实现,例如 ReactiveX RXJava(https://github.com/ReactiveX/RxJava)和 Akka Streams(https://akka。io),Ratpack(https://ratpack.io),Vert.x(https://vertx.io),Slick(https://scala-slick.org/),Project Reactor(https://projectreactor.io),反应式关系数据库连接(R2DBC)(https://r2dbc.io/)等众多框架。
Reactive Streams API 在 Java 9 SDK 中有自己的实现;换句话说,从 2017 年 12 月起,Reactive Streams 版本 1.0.2 已成为 JDK9 的一部分。
项目反应框架
Project Reactor 3.5.x 是一个基于反应式流规范构建的库,将反应式编程范式引入 JVM。反应式编程是一种基于事件的模型,数据会在可用时推送给消费者,处理的是异步事件序列。它提供了完全异步和非阻塞的模式,是 JDK 中有限的异步代码实现方式(如回调、API 和 Future 接口)的替代方案。
Reactor 是一个完整的非阻塞反应式编程框架,能够管理背压并与 Java 8 及以上版本的函数式 API(如 CompletableFuture、Stream 和 Duration)进行集成。Reactor 提供了两个可组合的异步反应式 API:Flux [N](用于 N 个元素)和 Mono [0|1](用于 0 或 1 个元素)。Reactor 适合用于开发微服务架构,因为它提供了与 reactor-ipc 组件的进程间通信(IPC)以及支持背压的网络引擎,适用于 HTTP(包括 WebSockets、TCP 和 UDP),并且完全支持反应式编码和解码。
Mono,一种异步[0|1]结果
Mono 是一个专门的 Publisher 接口,它会发出一个项目,并可以选择性地以 onComplete 或 onError 信号结束。您可以使用操作符来处理这个项目(见图 7-1)。
Flux: 一个包含 [0|N] 项的异步序列
Flux 是一个 Publisher,表示一个异步序列,包含 0 到 N 个发出的项目,可以选择通过 onComplete 或 onError 信号来终止(见图 7-2)。
Project Reactor 提供了处理器、操作符和定时器,能够在每秒处理数千万条消息的同时保持高吞吐量,并且内存占用非常低。
使用 Spring Boot 开发响应式网页和数据应用程序
使用 Spring Boot 创建反应式应用程序变得更加简单。Spring Boot 通过添加 spring-boot-starter-webflux 启动器及相关的反应式数据依赖项,自动配置所有的 Web 和数据反应式应用程序。当您在 Spring Initializr (https://start.spring.io) 创建项目时,点击“添加依赖项”并输入“反应式”,您将看到一个与图 7-3 中所示相似的依赖项列表。
从 Web 到任何 SQL 或 NoSQL 数据的反应式处理。您可以采用任何编程风格,从使用注解到函数式编程,并使用 Mono或 Flux类的反应式类型来处理响应。当您使用 Spring Boot 与 Spring 反应式 Web(即 WebFlux)时,默认的应用程序容器是 Netty(不再使用 Tomcat)。您也可以选择使用 Undertow。
图 7-4 展示了在使用 Spring MVC 或 Spring WebFlux 时的不同编程模型。
正如你所见,我们可以选择从功能性端点到注解端点的不同选项。
基于 Spring Boot Reactive 的用户应用程序
对于我们的用户应用程序,我们将使用 WebFlux 和 R2DBC,以及 H2 和函数式编程来实现我们的端点,这与之前的版本略有不同。您可以通过访问 Spring Initializr (https://start.spring.io) 从零开始生成一个基础项目(无依赖项)。请确保将组字段设置为 com.apress,将工件和名称字段设置为 users。下载项目后,解压缩并导入到您喜欢的 IDE 中。到本节结束时,您应该能够看到图 7-5 中所示的结构。
接下来,打开 build.gradle 文件,并将其内容替换为列表 7-1 中所示的内容。
plugins {
id 'java'
id 'org.springframework.boot' version '3.2.3'
id 'io.spring.dependency-management' version '1.1.4'
}
group = 'com.apress'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '17'
repositories {
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
runtimeOnly 'io.r2dbc:r2dbc-h2'
// Reactive Postgres
//runtimeOnly 'org.postgresql:r2dbc-postgresql'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
// Web
implementation 'org.webjars:bootstrap:5.2.3'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
}
tasks.named('test') {
useJUnitPlatform()
}
test {
testLogging {
events "passed", "skipped", "failed"
showExceptions true
exceptionFormat "full"
showCauses true
showStackTraces true
showStandardStreams = false
}
}
7-1 build.gradle
列表 7-1 展示了使用 WebFlux 和 R2DBC 所需的依赖项。请注意,我们需要包含数据库驱动程序(在本示例中使用 H2 数据库)并添加反应式驱动程序(在此情况下使用 r2dbc-h2 驱动程序)。如果 H2 不适合您,您可以切换到反应式 PostgreSQL 的 r2dbc-postgresql 依赖项。此外,请注意,我们使用了 reactor-test 依赖项,这使我们能够测试新的类型,如 Flux 和 Mono。
接下来,创建或打开用户类。请参阅列表 7-2。
package com.apress.users;
import jakarta.validation.constraints.NotBlank;
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Table;
import java.util.Collection;
import java.util.UUID;
@Table("PEOPLE")
public record User(
@Id
UUID id,
@NotBlank(message = "Email can not be empty")
String email,
@NotBlank(message = "Name can not be empty")
String name,
String gravatarUrl,
@NotBlank(message = "Password can not be empty")
String password,
Collection<UserRole> userRole,
boolean active
) {
public User withGravatarUrl(String email) {
String url = UserGravatar.getGravatarUrlFromEmail(email);
return new User(UUID.randomUUID(), email, name, url, password, userRole, active);
}
}
7-2 src/main/java/apress/com/users/User.java
列表 7-2 展示了用户类的记录类型!是的,我们可以回到第 4 章中介绍的记录类型,但这次尽量简单。我们正在创建一个自定义方法,作为我们的工厂,用于构建一个新的用户(请记住记录类型的不可变性)。我们使用了@Table 注解,值为 PEOPLE,这将是表的名称。同时,我们也使用了@Id 注解。请注意,这里我们使用的是 Spring Data 核心,这是我们领域类的一个常见用例。
接下来,创建或打开 UserRepository 接口。请参阅第 7-3 号列表。
package com.apress.users;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import reactor.core.publisher.Mono;
import java.util.UUID;
public interface UserRepository extends ReactiveCrudRepository<User, UUID> {
Mono<User> findByEmail(String email);
Mono<Void> deleteByEmail(String email);
}
Listing 7-3src/main/java/apress/com/users/UserRepository.java
In Listing 7-3, we are using a new extended interface, ReactiveCrudRepository, which takes the domain class (in this case the User) and the identifier (in this case a UUID).
Next, create/open the UserRoutes class. See Listing 7-4.
package com.apress.users;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.springframework.web.reactive.function.BodyExtractors.toMono;
import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
@RequiredArgsConstructor
@Configuration
public class UsersRoutes {
private final UserRepository userRepository;
@Bean
public RouterFunction<ServerResponse> getUsersRoute() {
return route(GET("/users"), request -> ServerResponse.ok()
.body(this.userRepository.findAll(),User.class));
}
@Bean
public RouterFunction<ServerResponse> postUserRoute() {
return route(POST("/users"), request -> request
.body(toMono(User.class))
.flatMap(this.userRepository::save)
.then(ServerResponse.ok().build()));
}
@Bean
public RouterFunction<ServerResponse> findUserByEmail(){
return route(GET("/users/{email}"), request -> ServerResponse.ok()
.body(this.userRepository.findByEmail(request.pathVariable("email")),User.class));
}
@Bean
public RouterFunction<ServerResponse> deleteUserByEmail(){
return route(DELETE("/users/{email}"), request -> {
this.userRepository.deleteByEmail(request.pathVariable("email"));
return ServerResponse.noContent().build();
});
}
}
7-4 src/main/java/apress/com/users/UserRoutes.java
在列表 7-4 中,我们为端点定义了路由并配置了服务器响应,所有这些都在 UserRoutes 类中(这与之前的版本不同,之前我们有两个独立的文件来定义路由,另一个文件用于响应)。请注意,这是一个 JavaConfig 类,我们用@Configuration 注解标记它,这意味着我们需要用@Bean 注解来声明 bean。我们使用 RouterFunction 和 ServerResponse 类型,并通过路由配置来接受请求并返回 ServerResponse。 这一切都非常简单,因为我们依赖于核心,即 Spring Data 核心和仓库编程模型(这意味着 Spring Data 为我们实现了 ReactiveCrudRepository 接口)。
因为我们使用 ServerResponse,它会根据来自反应式数据库的响应,将所有响应包装为 Flux 或 Mono 类。
如果我们仅使用 Project Reactor 来访问数据库(不使用 Spring R2DBC),我们需要编写如下代码片段:
ConnectionFactory connectionFactory = ConnectionFactories
.get("r2dbc:h2:mem:///testdb");
Mono.from(connectionFactory.create())
.flatMapMany(connection -> connection
.createStatement("SELECT name FROM PEOPLE WHERE email = $1")
.bind("$1", "norma@email.com")
.execute())
.flatMap(result -> result
.map((row, rowMetadata) -> row.get("name", String.class)))
.doOnNext(System.out::println)
.subscribe();
接下来,创建或打开 UserConfiguration 类。请参阅第 7-5 节。
package com.apress.users;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.r2dbc.mapping.event.BeforeConvertCallback;
import org.springframework.data.relational.core.sql.SqlIdentifier;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Arrays;
@Configuration
public class UserConfiguration {
@Component
class GravatarUrlGeneratingCallback implements BeforeConvertCallback<User>{
@Override
public Mono<User> onBeforeConvert(User user, SqlIdentifier sqlIdentifier) {
if (user.id() == null && (user.gravatarUrl() == null || user.gravatarUrl().isEmpty())) {
return Mono.just(user.withGravatarUrl(user.email()));
}
return Mono.just(user);
}
}
@Bean
CommandLineRunner init(UserRepository userRepository){
return args -> {
userRepository.saveAll(Arrays.asList(new User(null,"ximena@email.com","Ximena",null,"aw2s0me", Arrays.asList(UserRole.USER),true)
,new User(null,"norma@email.com","Norma" ,null, "aw2s0me", Arrays.asList(UserRole.USER, UserRole.ADMIN),true)))
.blockLast(Duration.ofSeconds(10));
};
}
}
7-5 src/main/java/apress/com/users/UserConfiguration.java
让我们来分析一下用户配置类:
- 在 BeforeConvertCallback 中,我们声明了一个实现 BeforeConvertCallback 接口的内部类。我们在之前的版本中(用于数据持久化应用程序)已经实现过这个接口,而这次我们选择将其作为内部类声明,而不是单独创建一个类(选择哪个选项取决于您以及您希望在何处实现特定逻辑)。在这个方法中,我们检查 gravatarUrl 字段,并在其为 null 时进行设置。
- CommandLineRunner:我们并不是使用 ApplicationListener(它会监听 ApplicationReadyEvent),而是使用 CommandLineRunner 接口,这只是提醒我们在应用程序准备好时有多种执行代码的选择。
接下来,创建或打开 schema.sql 文件,如清单 7-6 所示。
drop table if exists people cascade;
create table people (
id uuid default random_uuid() not null,
email varchar(255) not null,
active boolean not null,
gravatar_url varchar(255),
name varchar(255),
password varchar(255),
user_role VARCHAR(100) array,
primary key (id));
7-6 src/main/resources/schema.sql
如果你想使用 PostgreSQL,需要将其更改为 uuid_generate_v4()并启用处理这些函数的插件
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
UserGravatar 和 UserRole 类与之前的版本保持一致。application.properties 文件为空。请记住,Spring Boot 会自动进行配置,这意味着它会根据 schema.sql 文件中的 SQL 语句初始化数据库,并设置连接所需的所有默认值。
用户应用测试
为了测试用户应用程序,我们需要使用一个不同的客户端,该客户端支持 Flux或 Mono类型。具体来说,我们需要在 build.gradle 文件中添加 reactor-test 依赖(见清单 7-1)。因此,请创建或打开 UsersHttpRequestTests 类。请参见清单 7-7。
package com.apress.users;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.Collection;
import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class UsersHttpRequestTests {
@Autowired
private WebTestClient webTestClient;
@Test
public void indexPageShouldReturnHeaderOneContent() throws Exception {
webTestClient.get().uri("/")
.exchange()
.expectStatus().isOk()
.expectBody(String.class).value( value -> {
assertThat(value).contains("Simple Users Rest Application");
});
}
@Test
public void usersEndPointShouldReturnCollectionWithTwoUsers() throws Exception {
webTestClient.get().uri("/users")
.exchange().expectStatus().isOk()
.expectBody(Collection.class).value( collection -> {
assertThat(collection.size()).isGreaterThanOrEqualTo(3);
});
}
@Test
public void userEndPointPostNewUserShouldReturnUser() throws Exception {
webTestClient.post().uri("/users")
.body(Mono.just(new User(null,"dummy@email.com","Dummy",null,"aw2s0me", Arrays.asList(UserRole.USER),true)),User.class)
.exchange().expectStatus().isOk();
}
@Test
public void userEndPointDeleteUserShouldReturnVoid() throws Exception {
webTestClient.delete().uri("/users/norma@email.com")
.exchange().expectStatus().isNoContent();
}
@Test
public void userEndPointFindUserShouldReturnUser() throws Exception{
webTestClient.get().uri("/users/ximena@email.com")
.exchange().expectStatus().isOk()
.expectBody(User.class).value( user -> {
assertThat(user).isNotNull();
assertThat(user.email()).isEqualTo("ximena@email.com");
});
}
}
7-7 src/test/java/apress/com/users/UsersHttpRequestTests.java
列表 7-7 显示我们使用的是 WebTestClient 类,而不是 TestRestTemplate 类。WebTestClient 类是一个用于测试 Web 服务器的客户端,它内部使用 WebClient 类来发送请求。WebTestClient 类提供了流畅的 API,使用 get()、post()和 delete()方法,使您能够轻松配置测试。这个客户端不仅可以通过 HTTP 连接到任何服务器,还可以连接到任何 WebFlux 应用程序。
要运行测试,您可以使用您的 IDE,或者执行以下命令:
./gradlew clean test
UsersHttpRequestTests > userEndPointFindUserShouldReturnUser() PASSED
UsersHttpRequestTests > userEndPointDeleteUserShouldReturnVoid() PASSED
UsersHttpRequestTests > indexPageShouldReturnHeaderOneContent() PASSED
UsersHttpRequestTests > userEndPointPostNewUserShouldReturnUser() PASSED
UsersHttpRequestTests > usersEndPointShouldReturnCollectionWithTwoUsers() PASSED