Jetty ReactiveStreams HTTP客户端

2025/03/30

1. 概述

在本教程中,我们将学习如何使用Jetty中的Reactive HTTP客户端。我们将通过创建小型测试用例来演示其与不同Reactive库的用法。

2. 什么是Reactive HttpClient?

Jetty的HttpClient允许我们执行阻塞HTTP请求。但是,当我们处理Reactive API时,我们不能使用标准HTTP客户端。为了填补这一空白,Jetty为HttpClient API创建了一个包装器,以便它也支持ReactiveStreams API。

Reactive HttpClient用于通过HTTP调用消费或生成数据流

我们在此演示的示例将有一个响应式HTTP客户端,它将使用不同的Reactive库与Jetty服务器进行通信。我们还将讨论Reactive HttpClient提供的请求和响应事件。

建议阅读有关Project ReactorRxJavaSpring WebFlux的文章,以更好地理解响应式编程概念及其术语。

3. Maven依赖

让我们从在pom.xml中添加Reactive StreamsProject ReactorRxJavaSpring WebFluxJetty Reactive HTTPClient的依赖开始示例。除此之外,我们还将添加Jetty Server的依赖以创建服务器:

<dependency>
    <groupId>org.eclipse.jetty</groupId>
    <artifactId>jetty-reactive-httpclient</artifactId>
    <version>1.0.3</version>
</dependency>
<dependency>
    <groupId>org.eclipse.jetty</groupId>
    <artifactId>jetty-server</artifactId>
    <version>9.4.19.v20190610</version>
</dependency>
<dependency>
    <groupId>org.reactivestreams</groupId>
    <artifactId>reactive-streams</artifactId>
    <version>1.0.3</version>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.6.0</version>
</dependency>
<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.2.11</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-webflux</artifactId>
    <version>5.1.9.RELEASE</version>
</dependency>

4. 创建服务器和客户端

现在让我们创建一个服务器并添加一个请求处理程序,只需将请求正文写入响应:

public class RequestHandler extends AbstractHandler {
    @Override
    public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
        jettyRequest.setHandled(true);
        response.setContentType(request.getContentType());
        IO.copy(request.getInputStream(), response.getOutputStream());
    }
}

...

Server server = new Server(8080);
server.setHandler(new RequestHandler());
server.start();

然后我们可以编写HttpClient:

HttpClient httpClient = new HttpClient();
httpClient.start();

现在我们已经创建了客户端和服务器,让我们看看如何将这个阻塞HTTP客户端转换为非阻塞客户端并创建请求:

Request request = httpClient.newRequest("http://localhost:8080/"); 
ReactiveRequest reactiveRequest = ReactiveRequest.newBuilder(request).build();
Publisher<ReactiveResponse> publisher = reactiveRequest.response();

因此,Jetty提供的ReactiveRequest包装器使我们的阻塞HTTP客户端具有响应性,让我们继续看看它与不同的响应式库的用法。

5. ReactiveStreams用法

Jetty的HttpClient原生支持Reactive Streams,所以让我们从那里开始。

现在,Reactive Streams只是一组接口,因此,为了进行测试,让我们实现一个简单的阻塞订阅者:

public class BlockingSubscriber implements Subscriber<ReactiveResponse> {
    BlockingQueue<ReactiveResponse> sink = new LinkedBlockingQueue<>(1);

    @Override
    public void onSubscribe(Subscription subscription) {
        subscription.request(1);
    }

    @Override
    public void onNext(ReactiveResponse response) {
        sink.offer(response);
    }

    @Override
    public void onError(Throwable failure) { }

    @Override
    public void onComplete() { }

    public ReactiveResponse block() throws InterruptedException {
        return sink.poll(5, TimeUnit.SECONDS);
    }
}

请注意,我们需要根据JavaDoc调用Subscription#request,其中指出“在通过此方法发出需求信号之前,Publisher不会发送任何事件”。

另外,请注意,我们添加了一种安全机制,以便我们的测试在5秒内没有看到该值时可以退出。

现在,我们可以快速测试我们的HTTP请求:

BlockingSubscriber subscriber = new BlockingSubscriber();
publisher.subscribe(subscriber);
ReactiveResponse response = subscriber.block();
Assert.assertNotNull(response);
Assert.assertEquals(response.getStatus(), HttpStatus.OK_200);

6. Project Reactor使用

现在让我们看看如何将Reactive HttpClient与Project Reactor结合使用,发布者的创建与上一节基本相同。

创建发布者后,让我们使用Project Reactor中的Mono类来获取响应式响应:

ReactiveResponse response = Mono.from(publisher).block();

然后,我们可以测试结果的响应:

Assert.assertNotNull(response);
Assert.assertEquals(response.getStatus(), HttpStatus.OK_200);

6.1 Spring WebFlux使用

与Spring WebFlux一起使用时,将阻塞HTTP客户端转换为响应式客户端非常容易。Spring WebFlux附带了一个响应式客户端WebClient,可与各种HTTP客户端库一起使用。我们可以使用它作为直接使用Project Reactor代码的替代方案。

因此首先,让我们使用JettyClientHttpConnector包装Jetty的HTTP客户端,并将其与WebClient绑定:

ClientHttpConnector clientConnector = new JettyClientHttpConnector(httpClient);

然后将此连接器传递给WebClient以执行非阻塞HTTP请求:

WebClient client = WebClient.builder().clientConnector(clientConnector).build();

接下来,让我们使用刚刚创建的Reactive HTTPClient进行实际的HTTP调用并测试结果:

String responseContent = client.post()
    .uri("http://localhost:8080/").contentType(MediaType.TEXT_PLAIN)
    .body(BodyInserters.fromPublisher(Mono.just("Hello World!"), String.class))
    .retrieve()
    .bodyToMono(String.class)
    .block();
Assert.assertNotNull(responseContent);
Assert.assertEquals("Hello World!", responseContent);

7. RxJava2使用

现在让我们继续看看如何将Reactive HTTP客户端与RxJava2一起使用。

在这里,让我们稍微改变一下我们的例子,现在在请求中包含一个主体:

ReactiveRequest reactiveRequest = ReactiveRequest.newBuilder(request)
    .content(ReactiveRequest.Content
        .fromString("Hello World!", "text/plain", StandardCharsets.UTF_8))
    .build();
Publisher<String> publisher = reactiveRequest
    .response(ReactiveResponse.Content.asString());

代码ReactiveResponse.Content.asString()将响应主体转换为字符串,如果我们只对请求的状态感兴趣,也可以使用ReactiveResponse.Content.discard()方法丢弃响应。

现在,我们可以看到使用RxJava2获取响应实际上与Project Reactor非常相似。基本上,我们只是使用Single而不是Mono:

String responseContent = Single.fromPublisher(publisher)
    .blockingGet();

Assert.assertEquals("Hello World!", responseContent);

8. 请求和响应事件

Reactive HTTP客户端在执行过程中会发出许多事件,它们分为请求事件和响应事件,这些事件有助于了解Reactive HTTP客户端的生命周期。

这次,让我们使用HTTP客户端而不是请求来稍微不同地发出响应请求:

ReactiveRequest request = ReactiveRequest.newBuilder(httpClient, "http://localhost:8080/")
    .content(ReactiveRequest.Content.fromString("Hello World!", "text/plain", UTF_8))
    .build();

现在让我们获取HTTP请求事件的发布者:

Publisher<ReactiveRequest.Event> requestEvents = request.requestEvents();

现在,让我们再次使用RxJava。这次,我们将创建一个包含事件类型的列表,并通过订阅发生的请求事件来填充它:

List<Type> requestEventTypes = new ArrayList<>();

Flowable.fromPublisher(requestEvents)
    .map(ReactiveRequest.Event::getType).subscribe(requestEventTypes::add);
Single<ReactiveResponse> response = Single.fromPublisher(request.response());

然后,由于我们正在测试,我们可以阻塞我们的响应并验证:

int actualStatus = response.blockingGet().getStatus();

Assert.assertEquals(6, requestEventTypes.size());
Assert.assertEquals(HttpStatus.OK_200, actualStatus);

类似地,我们也可以订阅响应事件。由于它们与请求事件订阅类似,因此我们在这里仅添加了后者。可以在本文末尾链接的GitHub仓库库中找到包含请求和响应事件的完整实现。

9. 总结

在本教程中,我们了解了Jetty提供的ReactiveStreams HttpClient、它与各种Reactive库的用法以及与Reactive请求相关的生命周期事件。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章