1. 概述
在响应式编程中,处理和转换数据流对于构建响应式应用程序至关重要。创建Mono实例的两种常用方法是Mono.fromCallable和Mono.justOrEmpty,这两种方法都有其独特的用途,具体取决于我们想要如何处理流中的可空性和惰性求值。
在本教程中,我们将探讨这些方法之间的差异,展示Mono.fromCallable如何通过包装计算来延迟执行并优雅地处理错误,而Mono.justOrEmpty如何直接从可选值创建Mono实例,从而简化可能有空数据的情况。
2. Mono简介
Mono是Project Reactor中的一个Publisher,表示最多发出一个值的流。它可以是完整的,带有值,可以为空,也可以因错误而终止。
Mono支持两种类型的发布者:冷发布者(cold publisher)和热发布者(hot publisher)。
冷发布者只会在消费者订阅后才发布元素,确保每个消费者从一开始就收到数据,而热发布者则会在创建后立即发出数据,无论是否订阅。
Mono的fromCallable是冷发布者的一个示例:订阅后它会延迟返回该Mono。相反,justOrEmpty是一个表现为热发布者的Mono,它会立即发出数据而无需等待任何订阅。
来源:projectreactor.io
3. Mono.fromCallable
fromCallable接收一个Callable接口并返回一个Mono,该Mono会延迟该Callable的执行,直到有对其的订阅。如果Callable解析为null,则生成的Mono为空。
让我们考虑一个示例用例,其中我们在每次方法调用之间以一致的5秒延迟来获取数据。我们将此逻辑设置为延迟,因此它仅在订阅时执行:
public String fetchData() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Data Fetched";
}
接下来,我们将定义一个使用fromCallable创建Mono发布者的方法,timeTakenForCompletion属性测量从订阅开始到收到onComplete信号之间的持续时间:
public void givenDataAvailable_whenCallingFromCallable_thenLazyEvaluation() {
AtomicLong timeTakenForCompletion = new AtomicLong();
Mono<String> dataFetched = Mono.fromCallable(this::fetchData)
.doOnSubscribe(subscription -> timeTakenForCompletion.set(-1 * System.nanoTime()))
.doFinally(consumer -> timeTakenForCompletion.addAndGet(System.nanoTime()));
StepVerifier.create(dataFetched)
.expectNext("Data Fetched")
.verifyComplete();
}
最后,断言验证从订阅到接收onComplete信号的时间与预期的5秒延迟非常接近,从而确认fromCallable延迟执行直到订阅:
assertThat(TimeUnit.NANOSECONDS.toMillis(timeTakenForCompletion.get()))
.isCloseTo(5000L, Offset.offset(50L));
3.1 内置错误处理
fromCallable还支持内置错误处理,如果Callable抛出异常,fromCallable会捕获它,从而允许错误通过响应流传播。
让我们考虑同一个例子来了解fromCallable的错误处理:
public void givenExceptionThrown_whenCallingFromCallable_thenFromCallableCapturesError() {
Mono<String> dataFetched = Mono.fromCallable(() -> {
String data = fetchData();
if (data.equals("Data Fetched")) {
throw new RuntimeException("ERROR");
}
return data;
})
.onErrorResume(error -> Mono.just("COMPLETED"));
StepVerifier.create(dataFetched)
.expectNext("COMPLETED")
.verifyComplete();
}
4. Mono.justOrEmpty
justOrEmpty创建一个Mono,该Mono包含一个值,如果值为null则为空。与fromCallable不同,它不会推迟执行,而是在创建Mono后立即执行。
justOrEmpty不会传播错误,因为它被设计用于处理可空值。
让我们重新回顾一下之前模拟数据获取的用例,这次,我们将使用justOrEmpty方法来创建一个Mono类型的发布者。
timeTakenToReceiveOnCompleteSignalAfterSubscription属性跟踪从订阅到接收onComplete信号的时间,timeTakenForMethodCompletion属性测量该方法完成所需的总时间:
public void givenDataAvailable_whenCallingJustOrEmpty_thenEagerEvaluation() {
AtomicLong timeTakenToReceiveOnCompleteSignalAfterSubscription = new AtomicLong();
AtomicLong timeTakenForMethodCompletion = new AtomicLong(-1 * System.nanoTime());
Mono<String> dataFetched = Mono.justOrEmpty(fetchData())
.doOnSubscribe(subscription -> timeTakenToReceiveOnCompleteSignalAfterSubscription
.set(-1 * System.nanoTime()))
.doFinally(consumer -> timeTakenToReceiveOnCompleteSignalAfterSubscription
.addAndGet(System.nanoTime()));
timeTakenForMethodCompletion.addAndGet(System.nanoTime());
StepVerifier.create(dataFetched)
.expectNext("Data Fetched")
.verifyComplete();
}
让我们写一个断言来证明从订阅到收到onComplete信号的时间非常短,从而确认Mono是被急切创建的:
assertThat(TimeUnit.NANOSECONDS.toMillis(timeTakenToReceiveOnCompleteSignalAfterSubscription
.get())).isCloseTo(1L, Offset.offset(1L));
接下来,让我们确认5秒的延迟包含在方法的完成时间中,并且发生在订阅Mono之前:
assertThat(TimeUnit.NANOSECONDS.toMillis(timeTakenForMethodCompletion.get()))
.isCloseTo(5000L, Offset.offset(50L));
5. 何时使用fromCallable
让我们看看可以使用Mono.fromCallable()方法的用例:
- 当我们需要有条件地订阅发布者以节省资源时
- 每次订阅都可能带来不同的结果
- 当操作有可能引发异常时,我们希望它们通过响应流传播
5.1 示例用法
让我们来看一个有条件延迟执行有益的示例用例:
public Optional<String> fetchLatestStatus() {
List<String> activeStatusList = List.of("ARCHIVED", "ACTIVE");
if (activeStatusList.contains("ARCHIVED")) {
return Optional.empty();
}
return Optional.of(activeStatusList.get(0));
}
public void givenLatestStatusIsEmpty_thenCallingFromCallableForEagerEvaluation() {
Optional<String> latestStatus = fetchLatestStatus();
String updatedStatus = "ACTIVE";
Mono<String> currentStatus = Mono.justOrEmpty(latestStatus)
.switchIfEmpty(Mono.fromCallable(()-> updatedStatus));
StepVerifier.create(currentStatus)
.expectNext(updatedStatus)
.verifyComplete();
}
在此示例中,Mono发布者在switchIfEmpty方法中使用fromCallable进行定义,允许其有条件地执行。因此,只有当Mono被订阅时才会返回ACTIVE状态,使其成为惰性执行。
6. 总结
在本文中,我们讨论了Mono.fromCallable方法(充当冷发布者)和Mono.justOrEmpty(充当热发布者),我们还探讨了何时使用fromCallable方法而不是justOrEmpty,强调了它们的区别并讨论了示例用例。
Post Directory
