1. 概述
通常,在我们的应用程序中发出HTTP请求时,我们会按顺序执行这些调用。但是,在某些情况下,我们可能希望同时执行这些请求。
例如,当我们从多个来源检索数据时,或者当我们只是想尝试提高我们的应用程序性能时,我们可能希望这样做。
在这个快速教程中,我们将介绍几种方法,看看我们如何通过使用Spring响应式WebClient进行并行服务调用来实现这一点。
2. 响应式编程回顾
作为快速回顾,WebClient是在Spring 5中引入的,并且作为Spring Web Reactive模块的一部分包含在内。它为发送HTTP请求提供了一个响应式的、非阻塞的接口。
有关使用WebFlux进行响应式编程的深入指南,请查看我们的Spring 5 WebFlux指南。
3. 简单的用户服务
在我们的示例中,我们将使用一个简单的用户API。此API有一个GET方法,它公开了一个方法getUser用于使用id作为参数检索用户。
让我们看一下如何通过单个调用来检索给定ID的用户:
WebClient webClient = WebClient.create("http://localhost:8080");
public Mono<User> getUser(int id) {
LOG.info(String.format("Calling getUser(%d)", id));
return webClient.get()
.uri("/user/{id}", id)
.retrieve()
.bodyToMono(User.class);
}
在下一节中,我们将学习如何并发调用此方法。
4. 同时调用WebClient
在本节中,我们将看到几个并发调用getUser方法的示例。我们还将在示例中查看发布者实现Flux和Mono。
4.1 对同一服务的多次调用
现在假设我们想要同时获取五个用户的数据并将结果作为用户列表返回:
public Flux fetchUsers(List userIds) {
return Flux.fromIterable(userIds)
.flatMap(this::getUser);
}
让我们分解这些步骤来理解我们做了什么:
我们首先使用静态fromIterable方法从我们的userIds列表创建一个Flux。
接下来,我们调用flatMap来运行我们之前创建的getUser方法。默认情况下,此响应式运算符的并发级别为256,这意味着它最多同时执行256个getUser调用。这个数字可以使用flatMap的重载版本通过方法参数进行配置。
值得注意的是,由于操作是并行发生的,因此我们不知道结果顺序。如果我们需要维护输入顺序,我们可以改用flatMapSequential运算符。
由于Spring WebClient在底层使用非阻塞HTTP客户端,因此用户无需定义任何调度程序。WebClient负责安排调用并在内部适当的线程上发布它们的结果,而不会阻塞。
4.2 对不同服务的多次调用返回相同类型
现在让我们看一下如何同时调用多个服务。
在此示例中,我们将创建另一个返回相同User类型的端点:
public Mono<User> getOtherUser(int id) {
return webClient.get()
.uri("/otheruser/{id}", id)
.retrieve()
.bodyToMono(User.class);
}
现在,并行执行两个或多个调用的方法变为:
public Flux fetchUserAndOtherUser(int id) {
return Flux.merge(getUser(id), getOtherUser(id));
}
这个例子的主要区别在于我们使用了静态方法merge而不是fromIterable方法。使用merge方法,我们可以将两个或多个Flux合并为一个结果。
4.3 对不同服务不同类型的多次调用
两个服务返回相同内容的概率相当低。更典型的是,我们会有另一个服务提供不同的响应类型,我们的目标是合并两个(或多个)响应。
Mono类提供了静态zip方法,它允许我们组合两个或多个结果:
public Mono fetchUserAndItem(int userId, int itemId) {
Mono user = getUser(userId);
Mono item = getItem(itemId);
return Mono.zip(user, item, UserWithItem::new);
}
zip方法将给定的user和item Mono组合成一个类型为UserWithItem的新Mono。这是一个简单的POJO对象,它包装了一个User和Item。
5. 测试
在本节中,我们将了解如何测试我们已经看到的代码,特别是验证服务调用是否并行发生。
为此,我们将使用Wiremock创建一个Mock服务器,并测试fetchUsers方法:
@Test
public void givenClient_whenFetchingUsers_thenExecutionTimeIsLessThanDouble() {
int requestsNumber = 5;
int singleRequestTime = 1000;
for (int i = 1; i <= requestsNumber; i++) {
stubFor(get(urlEqualTo("/user/" + i)).willReturn(aResponse().withFixedDelay(singleRequestTime)
.withStatus(200)
.withHeader("Content-Type", "application/json")
.withBody(String.format("{ \"id\": %d }", i))));
}
List<Integer> userIds = IntStream.rangeClosed(1, requestsNumber)
.boxed()
.collect(Collectors.toList());
Client client = new Client("http://localhost:8089");
long start = System.currentTimeMillis();
List<User> users = client.fetchUsers(userIds).collectList().block();
long end = System.currentTimeMillis();
long totalExecutionTime = end - start;
assertEquals("Unexpected number of users", requestsNumber, users.size());
assertTrue("Execution time is too big", 2 * singleRequestTime > totalExecutionTime);
}
在此示例中,我们采用的方法是Mock用户服务并使其在一秒钟内响应任何请求。现在,如果我们使用WebClient进行五个调用,我们可以假设它不应该超过两秒钟,因为调用是同时发生的。
要了解用于测试WebClient的其他技术,请查看我们在Spring中Mock WebClient的指南。
6. 总结
在本教程中,我们探索了几种使用Spring 5 Reactive WebClient同时进行HTTP服务调用的方法。
首先,我们展示了如何并行调用同一服务。后来,我们看到了如何调用两个返回不同类型的服务的示例。然后,我们展示了如何使用Mock服务器测试此代码。
与往常一样,本教程的完整源代码可在GitHub上获得。