使用Spring Reactive WebClient将Flux读入单个InputStream

2023/05/13

1. 概述

在本教程中,我们将深入研究Java响应式编程,以解决一个有趣的问题,即如何将Flux<DataBuffer>读取到单个InputStream中。

2. 请求设置

作为解决将Flux<DataBuffer>读入单个InputStream的问题的第一步,我们将使用Spring响应式WebClient来发出GET请求。此外,我们可以将gorest.co.in托管的公共API端点之一用于此类测试场景:

String REQUEST_ENDPOINT = "https://gorest.co.in/public/v2/users";

接下来,让我们定义用于获取WebClient实例的getWebClient()方法:

public class DataBufferToInputStream {

    private static WebClient getWebClient() {
        WebClient.Builder webClientBuilder = WebClient.builder();
        return webClientBuilder.build();
    }
}

此时,我们已准备好向/public/v2/users端点发出GET请求。但是,我们必须将响应主体作为Flux<DataBuffer>对象获取。因此,让我们继续下一节关于BodyExtractor的内容来精确地执行此操作。

3. BodyExtractor和DataBufferUtils

我们可以使用Spring-Webflux中提供的BodyExtractors类的toDataBuffers()方法将响应体提取到Flux<DataBuffer>中

让我们继续创建body作为Flux<DataBuffer>类型的实例:

Flux<DataBuffer> body = client.get()
    .uri(url)
    .exchangeToFlux(clientResponse -> {
        return clientResponse.body(BodyExtractors.toDataBuffers());
    });

接下来,由于我们需要将这些DataBuffer流收集到单个InputStream中,实现此目的的一个好策略是使用PipedInputStreamPipedOutputStream

此外,我们打算写入PipedOutputStream并最终从PipedInputStream读取。因此,让我们看看如何创建这两个连接的流:

PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(1024 * 10);
inputStream.connect(outputStream);

我们必须注意,默认大小是1024字节。但是,我们预计从Flux<DataBuffer>收集的结果可能会超过默认值。因此,我们需要明确指定一个更大的值,在本例中为1024 * 10。

最后,我们使用DataBufferUtils类中提供的write()工具方法将body作为发布者写入outputStream:

DataBufferUtils.write(body, outputStream).subscribe();

我们必须注意,我们在声明时将inputStream连接到outputStream。因此,我们最好从inputStream中读取。让我们继续下一节,看看它的实际效果。

4. 从PipedInputStream读取

首先,让我们定义一个工具方法readContent(),以将InputStream读取为String对象:

private static String readContent(InputStream stream) throws IOException {
    StringBuilder contentStringBuffer = new StringBuilder();
    byte[] tmp = new byte[stream.available()];
    int byteCount = stream.read(tmp, 0, tmp.length);
    logger.info(String.format("read %d bytes from the streamn", byteCount));
    contentStringBuffer.append(new String(tmp));
    return String.valueOf(contentStringBuffer);
}

接下来,因为在不同的线程中读取PipedInputStream是一种典型的做法,所以让我们创建readContentFromPipedInputStream()方法,该方法在内部生成一个新线程,通过调用readContent()方法将PipedInputStream中的内容读取到String对象中:

private static String readContentFromPipedInputStream(PipedInputStream stream) throws IOException {
    StringBuffer contentStringBuffer = new StringBuffer();
    try (stream) {
        Thread pipeReader = new Thread(() -> {
            try {
                contentStringBuffer.append(readContent(stream));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
        pipeReader.start();
        pipeReader.join();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return String.valueOf(contentStringBuffer);
}

在此阶段,我们的代码已准备好用于模拟。让我们看看它的实际效果:

public static void main(String[] args) throws IOException, InterruptedException {
	WebClient webClient = getWebClient();
	InputStream inputStream = getResponseAsInputStream(webClient, REQUEST_ENDPOINT);
	Thread.sleep(3000);
	String content = readContentFromPipedInputStream((PipedInputStream) inputStream);
	logger.info("response content: \n{}", content.replace("}","}\n"));
}

当我们处理异步系统时,我们在从流中读取之前将读取延迟任意3秒,以便我们能够看到完整的响应。此外,在日志记录时,我们插入换行符以将长输出分成多行。

最后,让我们验证代码执行生成的输出:

16:44:04.120 [main] INFO cn.tuyucheng.taketoday.databuffer.DataBufferToInputStream - response content: 
[{"id":2642,"name":"Bhupen Trivedi","email":"bhupen_trivedi@renner-pagac.name","gender":"male","status":"active"}
,{"id":2637,"name":"Preity Patel","email":"patel_preity@abshire.info","gender":"female","status":"inactive"}
,{"id":2633,"name":"Brijesh Shah","email":"brijesh_shah@morar.co","gender":"male","status":"inactive"}
...
,{"id":2623,"name":"Mohini Mishra","email":"mishra_mohini@hamill-ledner.info","gender":"female","status":"inactive"}
]

5. 总结

在本文中,我们使用了管道流的概念以及BodyExtractors和DataBufferUtils类中可用的工具方法来将Flux<DataBuffer>读入单个InputStream中。

与往常一样,本教程的完整源代码可在GitHub上获得。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章