Future、CompletableFuture和Rxjava的Observable之间的区别

2023/08/05

1. 概述

在Java中,我们可以通过多种方式异步运行任务。Java内置了FutureCompletableFuture,我们还可以使用RxJava库,它为我们提供了Observable类。在本文中,我们将研究这三者之间的差异以及各自的好处和潜在用例。

2. Future

Future接口最初出现在Java 5中,提供的功能非常有限。Future的实例是一个结果的占位符,该结果将由异步进程生成,并且可能尚不可用。提供了一小部分方法来帮助完成此进程,我们可以取消任务或获取已完成任务的结果,还可以检查任务是否已被取消或完成。

若要了解此操作的实际效果,让我们创建一个示例异步任务。我们将有一个对象和一个Callable,它的作用就像从数据库中检索该对象一样。我们的对象可以非常简单:

class TestObject {
    int dataPointOne;
    int dataPointTwo;

    TestObject() {
        dataPointOne = 10;
    }
    // Standard getters and setters
}

因此,在调用构造函数时,我们返回一个包含数据点集之一的TestObject实例。现在,我们可以创建第二个实现Callable接口的类来为我们创建该对象:

class ObjectCallable implements Callable<TestObject> {
    @Override
    TestObject call() {
        return new TestObject();
    }
}

设置这两个对象后,我们可以编写一个测试来使用Future获取TestObject:

@Test
void whenRetrievingObjectWithBasicFuture_thenExpectOnlySingleDataPointSet() throws ExecutionException, InterruptedException {
    ExecutorService exec = Executors.newSingleThreadExecutor();
    Future<TestObject> future = exec.submit(new ObjectCallable());
    TestObject retrievedObject = future.get();
    assertEquals(10, retrievedObject.getDataPointOne());
    assertEquals(0, retrievedObject.getDataPointTwo());
}

在这里,我们创建了一个可以向其提交任务的ExecutorService。接下来,我们提交了ObjectCallable类并接收Future作为响应。最后,我们可以在Future上调用get()来获取结果。我们从断言中看到我们的对象填充了单个数据点。

3. CompletableFuture

CompletableFuture是随Java 8一起发布的Future接口的实现,它扩展了Future的基本功能,使我们能够更好地控制异步操作的结果,最大的附加功能之一是将函数调用链接到初始任务的结果上的选项。让我们通过重复我们在上一节中完成的任务来看看它的实际效果,但这一次,我们想在得到结果后对其进行水合。让我们创建一个具有水合方法的对象来填充TestObject中的第二个数据点:

class ObjectHydrator {
    TestObject hydrateTestObject(TestObject testObject) {
        testObject.setDataPointTwo(20);
        return testObject;
    }
}

这次我们还需要从Supplier的实现中检索我们的初始TestObject:

class ObjectSupplier implements Supplier<TestObject> {
    @Override
    TestObject get() {
        return new TestObject();
    }
}

准备好这两个类后,让我们使用它们:

@Test
void givenACompletableFuture_whenHydratingObjectAfterRetrieval_thenExpectBothDataPointsSet() throws ExecutionException, InterruptedException {
    ExecutorService exec = Executors.newSingleThreadExecutor();
    ObjectHydrator objectHydrator = new ObjectHydrator();
    CompletableFuture<TestObject> future = CompletableFuture.supplyAsync(new ObjectSupplier(), exec)
        .thenApply(objectHydrator::hydrateTestObject);
    TestObject retrievedObject = future.get();
    assertEquals(10, retrievedObject.getDataPointOne());
    assertEquals(20, retrievedObject.getDataPointTwo());
}

这次我们可以从断言中看到,由于能够链接水合方法,我们已经在对象上设置了两个数据点。

4. RxJava的Observable

RxJava是一个库,允许我们按照响应式编程范例构建事件驱动和异步程序

要在项目中使用RxJava,我们需要将其导入到pom.xml中:

<dependency>
    <groupId>io.reactivex.rxjava3</groupId>
    <artifactId>rxjava</artifactId>
    <version>3.1.6</version>
</dependency>

最新版本可在Maven仓库中找到。

这个库可以做很多事情,但这里我们将重点关注Observable类。Observable根据需要或在数据可用时向观察者提供数据,要异步运行任务,就像我们对Future和CompletableFuture所做的那样,我们可以创建一个Observable,它将在请求时从异步源生成数据:

@Test
void givenAnObservable_whenRequestingData_thenItIsRetrieved() {
    ObjectHydrator objectHydrator = new ObjectHydrator();
    Observable<TestObject> observable = Observable.fromCallable(new ObjectCallable()).map(objectHydrator::hydrateTestObject);
    observable.subscribe(System.out::println);
}

在这里,我们从ObjectCallable类创建了一个Observable,并使用map()来应用我们的水化器,然后我们订阅Observable并提供一个方法来处理结果。在我们的例子中,我们只是将结果记录下来。这给出了与我们的CompletableFuture实现完全相同的最终结果。subscribe()方法与CompletableFutures get()的作用相同。

虽然我们可以清楚地使用RxJava来实现与CompletableFuture相同的目的,但它的主要用例是它提供的大量其他功能。一个例子是以完全不同的方式再次执行相同的任务,我们可以创建一个Observable来等待数据到达,然后可以从其他地方将数据推送到它:

@Test
void givenAnObservable_whenPushedData_thenItIsReceived() {
    PublishSubject<Integer> source = PublishSubject.create();
    Observable<Integer> observable = source.observeOn(Schedulers.computation());
    observable.subscribe(System.out::println, (throwable) -> System.out.println("Error"), () -> System.out.println("Done"));

    source.onNext(1);
    source.onNext(2);
    source.onNext(3);
    source.onComplete();
}

运行时,此测试会生成以下输出:

1
2
3
Done

因此,我们可以订阅尚未生成任何内容的数据源,然后只需等待即可。数据准备就绪后,我们使用onNext()将其推送到源,并通过订阅收到警报。这是RxJava允许的响应式编程风格的一个示例,我们对外部来源推送给我们的事件和新数据做出反应,而不是我们自己请求。

5. 总结

在本文中,我们了解了早期Java的Future接口如何提供有用但有限的异步执行任务并稍后获取结果的能力。接下来,我们探讨了较新的实现CompletableFuture带来的好处,这使我们能够将方法调用串在一起,并对整个过程提供更好的控制。

最后,我们看到我们可以使用RxJava执行相同的工作,但也注意到它是一个广泛的库,允许我们做更多的事情。我们简要了解了如何使用RxJava将任务异步推送到观察者,同时无限期地订阅数据流。

与往常一样,示例代码可以在GitHub上找到。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章