CompletableFuture allOf().join()与CompletableFuture.join()

2023/07/25

1. 概述

在本文中,我们将探讨CompletableFuture.allOf()方法的细节,并了解使用它与在多个单独的CompletableFuture实例上调用join()之间的区别。我们会发现allOf()使我们能够以非阻塞的方式继续我们的流程,同时还确保原子性。

2. join()与allOf()

CompletableFuture是Java 8中引入的一个强大功能,可促进和简化非阻塞代码的创建。在本文中,我们将重点介绍两种支持并行代码执行的方法:join()和allOf()

我们首先分析这两种方法的内部工作原理。接下来,我们将深入研究他们实现共同目标、并行执行代码以及随后合并结果的不同方法。对于本文的代码片段,我们将使用两个辅助函数,它们会阻塞线程一段时间,然后返回一些数据或引发异常:

private CompletableFuture waitAndReturn(long millis, String value) {
    return CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(millis);
            return value;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    });
}

private CompletableFuture waitAndThrow(long millis) {
    return CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(millis);
        } finally {
            throw new RuntimeException();
        }
    });
}

2.1 join()

CompletableFuture API公开join()方法,作为通过阻塞线程直到执行完成来检索Future对象值的一种方式。我们应该注意到,即使CompletableFuture的执行发生在不同的线程上,调用者线程也会被阻塞:

CompletableFuture<String> future = waitAndReturn(1_000, "Harry");
assertEquals("Harry", future.join());

此外,如果CompletableFuture完成时出现错误,join()会将其作为RuntimeException抛出

CompletableFuture<String> futureError = waitAndThrow(1_000);
assertThrows(RuntimeException.class, futureError::join);

2.2 allOf()

静态方法allOff()允许我们组合多个CompletableFuture实例并返回一个CompletableFuture<Void>,结果对象的完成取决于所有后续Future的完成。而且,如果后续Future中任何一个异常完成,则整体结果也将被视为失败。重要的是要了解allOf()不是阻塞方法,这意味着它将立即执行:

CompletableFuture<String> f1 = waitAndReturn(1_000, "Harry");
CompletableFuture<String> f2 = waitAndReturn(2_000, "Ron");

CompletableFuture<Void> combinedFutures = CompletableFuture.allOf(f1, f2);

但是,为了提取值,我们需要调用API的其他方法。例如,如果我们对生成的CompletableFuture<Void>调用join(),线程将等待两个组合CompletableFuture对象完成-每个对象都在自己的线程中。换句话说,调用者线程将被阻塞的时间与长Future完成所需的时间相同:

combinedFutures.join();

由于主线程已经等待了两秒,因此f1和f2现已完成,后续的join()或get()等调用将立即执行:

assertEquals("Harry", f1.join());
assertEquals("Ron", f2.join());

2.3 并行执行代码

从前面的示例中我们可以注意到,我们可以并行执行CompletableFutures并只需对每个结果调用join()即可组合结果

CompletableFuture<String> f1 = waitAndReturn(1_000, "Harry");
CompletableFuture<String> f2 = waitAndReturn(2_000, "Ron");

sayHello(f1.join());
sayHello(f2.join());

或者通过迭代CompletableFuture的Collection或Stream,对它们中的每一个调用join()并消费它们的结果:

Stream.of(f1, f2).map(CompletableFuture::join).forEach(this::sayHello);

现在的问题是在迭代和拼接所有CompletableFuture之前使用静态allOf()方法是否会对最终结果产生任何影响:

CompletableFuture.allOf(f1, f2).join();
Stream.of(f1, f2).map(CompletableFututre::join).forEach(this::sayHello);

这两种方法之间有两个显著区别:错误处理和以非阻塞方式进行的能力。让我们深入研究它们并了解它们的特殊性。

3. 错误处理

两种方法之间的主要区别之一是,如果我们省略调用allOf,我们将按顺序处理CompletableFuture的结果。因此,我们最终可以对值进行部分处理

换句话说,如果其中一个CompletableFuture抛出异常,它将破坏链并停止处理。在某些情况下,这可能会导致错误,因为前面的元素已经处理过:

CompletableFuture<String> f1 = waitAndReturn(1_000, "Harry");
CompletableFuture<String> f2 = waitAndThrow(1_000);
CompletableFuture<String> f3 = waitAndReturn(1_000, "Ron");

Stream.of(f1, f2, f3)
    .map(CompletableFuture::join)
    .forEach(this::sayHello);

另一方面,我们可以使用allOf()组合三个实例,然后调用join()方法来实现某种原子性。通过这样做,我们要么立即处理所有元素,要么不处理任何元素

CompletableFuture.allOf(f1, f2, f3).join();
Stream.of(f1, f2, f3)
    .map(CompletableFuture::join) 
    .forEach(this::sayHello);

4. 非阻塞代码

allOf()的优点之一是它允许我们以非阻塞的方式继续我们的流程。由于返回类型是CompletableFuture<Void>,我们可以使用thenAccept()在数据到达时处理数据,而不会阻塞线程

CompletableFuture.allOf(f1, f2, f3)
    .thenAccept(__ -> sayHelloToAll(f1.join(), f2.join(), f3.join()));

类似地,如果我们需要合并来自不同Future的数据,我们可以使用thenApply()方法。例如,我们可以拼接三个future的值,并使用结果字符串继续非阻塞流程:

CompletableFuture<String> names = CompletableFuture.allOf(f1, f2, f3)
    .thenApply(__ -> f1.join() + "," + f2.join() + "," + f3.join());

此外,如果我们不离开异步世界并继续CompletableFuture链,我们将能够利用它自己的机制进行错误处理和恢复,即exceptionally()方法。例如,如果其中一个CompletableFuture完成时出现异常,我们可以简单地记录它并使用默认值继续流程:

CompletableFuture<String> names = CompletableFuture.allOf(f1, f2, f3)
    .thenApply(__ -> f1.join() + "," + f2.join() + "," + f3.join())
    .exceptionally(err -> {
        System.out.println("oops, there was a problem! " + err.getMessage());
        return "names not found!";
    });

5. 总结

在本文中,我们学习了如何使用CompletableFuture的join()进行并行代码执行,同时无缝合并结果。我们还揭示了allOf()的优点,join()允许我们以原子方式处理数据。换句话说,只有当所有组成的CompletableFuture对象都成功完成时,流程才会继续。

最后,我们发现我们可以使用allOf()并省略调用join()。这将使我们能够使用多个CompletableFuture的结果,同时继续非阻塞流程。我们通过API的其他有用方法实现了这一点,例如thenApply()、theAccept()和exceptionally()。

与往常一样,示例代码可以在GitHub上找到。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章