在Java中完成线程的作业后返回一个值

2023/06/07

1. 概述

Java的主要特性之一是并发性,它允许多个线程运行并执行并行任务。因此,我们可以执行异步和非阻塞指令。这将优化可用资源,尤其是当计算机具有多个CPU时。有两种类型的线程:有返回值或没有返回值(在后一种情况下,我们说它将有一个void返回方法)。

在本文中,我们将重点介绍如何从其作业已终止的线程返回一个值

2. Thread和Runnable

我们将Java线程称为轻量级进程,让我们看一下Java程序通常是如何工作的:

Java程序是一个正在执行的进程。线程是Java进程的子集,可以访问主内存。它可以与同一进程的其他线程通信。

线程具有生命周期和不同的状态。实现它的一种常见方法是通过Runnable接口:

public class RunnableExample implements Runnable {
    // ...
    @Override
    public void run() {
        // do something
    }
}

然后,我们可以开始我们的线程:

Thread thread = new Thread(new RunnableExample());
thread.start();
thread.join();

如我们所见,我们无法从Runnable返回值。但是,我们可以使用wait()和notify()与其他线程同步。join()方法将使执行处于等待状态,直到它完成。当我们从异步执行中获取结果时,我们稍后会看到这有多重要。

3. Callable

Java从1.5版本开始引入了Callable接口。让我们看一个异步任务返回阶乘计算值的示例,我们使用BigInteger,因为结果可能是一个很大的数字:

public class CallableFactorialTask implements Callable<BigInteger> {
    // fields and constructor
    @Override
    public BigInteger call() throws Exception {
        return factorial(BigInteger.valueOf(value));
    }
}

让我们也创建一个简单的阶乘计算器:

public class FactorialCalculator {

    public static BigInteger factorial(BigInteger end) {
        BigInteger start = BigInteger.ONE;
        BigInteger res = BigInteger.ONE;

        for (int i = start.add(BigInteger.ONE).intValue(); i <= end.intValue(); i++) {
            res = res.multiply(BigInteger.valueOf(i));
        }

        return res;
    }

    public static BigInteger factorial(BigInteger start, BigInteger end) {
        BigInteger res = start;

        for (int i = start.add(BigInteger.ONE).intValue(); i <= end.intValue(); i++) {
            res = res.multiply(BigInteger.valueOf(i));
        }

        return res;
    }
}

Callable只有一个方法call()需要我们覆盖,该方法将返回我们的异步任务的对象。

Callable和Runnable都是@FunctionalInterface。Callable可以返回一个值并抛出异常。但是,它需要一个Future来完成任务。

4. 执行Callable

我们可以使用Future或Fork/Join执行Callable。

4.1 Callable和Future

从1.5版本开始,Java具有Future接口来创建包含我们异步处理的响应的对象。我们可以在逻辑上将Future与Javascript中的Promise进行比较。

例如,当我们想要从多个端点获取数据时,我们通常会看到Future。因此,我们需要等待所有任务完成才能收集响应数据。

Future包装响应并等待线程完成。但是,我们可能会遇到中断,例如超时或执行异常。

让我们看一下Future接口:

public interface Future<V> {
    boolean cancel(boolean var1);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long var1, TimeUnit var3) throws InterruptedException, ExecutionException, TimeoutException;
}

get方法很有趣,我们可以等待并获取执行的结果

要启动Future作业,我们将它的执行与ThreadPool相关联。这样,我们将为这些异步任务分配一些资源。

让我们创建一个使用Executor的示例,该Executor对我们之前看到的Callable中的阶乘数求和。我们将使用Executor接口和ExecutorService实现来创建ThreadPoolExecutor。我们可能想使用固定的或缓存的线程池。在本例中,我们将使用缓存线程池来演示:

public BigInteger execute(List<CallableFactorialTask> tasks) {

    BigInteger result = BigInteger.ZERO;

    ExecutorService cachedPool = Executors.newCachedThreadPool();

    List<Future<BigInteger>> futures;

    try {
        futures = cachedPool.invokeAll(tasks);
    } catch (InterruptedException e) {
        // exception handling example
        throw new RuntimeException(e);
    }

    for (Future<BigInteger> future : futures) {
        try {
            result = result.add(future.get());
        } catch (InterruptedException | ExecutionException e) {
            // exception handling example
            throw new RuntimeException(e);
        }
    }

    return result;
}

我们可以用一张图来表示这个执行,我们可以在其中观察线程池和Callable是如何交互的:

Executor将在Future对象中调用和收集所有内容。然后,我们可以从异步处理中得到一个或多个结果。

让我们通过统计两个阶乘数的结果来测试:

@Test
void givenCallableExecutor_whenExecuteFactorial_thenResultOk() {
    BigInteger result = callableExecutor.execute(Arrays.asList(new CallableFactorialTask(5), new CallableFactorialTask(3)));
    assertEquals(BigInteger.valueOf(126), result);
}

4.2 Callable和Fork/Join

我们还可以选择使用ForkJoinPool。它仍然与ExecutorService类似,因为它扩展了AbstractExecutorService类。但是,它有不同的方式来创建和组织线程。它将任务分解为更小的任务并优化资源,使它们永远不会闲置。我们可以用图表表示子任务:

我们可以看到主任务将分叉为SubTask1、SubTask3和SubTask4作为最小的可执行块。最后,他们将合并最终结果。

让我们使用ForkJoinPool将前面的示例转换,我们可以将所有内容包装在一个execute方法中:

public BigInteger execute(List<Callable<BigInteger>> forkFactorials) {
    List<Future<BigInteger>> futures = forkJoinPool.invokeAll(forkFactorials);

    BigInteger result = BigInteger.ZERO;

    for (Future<BigInteger> future : futures) {
        try {
            result = result.add(future.get());
        } catch (InterruptedException | ExecutionException e) {
            // exception handling example
            throw new RuntimeException(e);
        }
    }

    return result;
}

在这种情况下,我们只需要创建一个不同的池来获得我们的Future。让我们用阶乘Callable的列表来测试它:

@Test
void givenForkExecutor_whenExecuteCallable_thenResultOk() {
    assertEquals(BigInteger.valueOf(126), forkExecutor.execute(Arrays.asList(new CallableFactorialTask(5), new CallableFactorialTask(3))));
}

然而,我们也可以决定如何分解我们的任务。我们可能希望根据某些标准来分解我们的计算,例如,根据输入参数或服务负载。

我们需要将任务重写为ForkJoinTask,因此我们将使用RecursiveTask

public class ForkFactorialTask extends RecursiveTask<BigInteger> {
    // fields and constructor

    @Override
    protected BigInteger compute() {

        BigInteger factorial = BigInteger.ONE;

        if (end - start > threshold) {

            int middle = (end + start) / 2;

            return factorial.multiply(new ForkFactorialTask(start, middle, threshold).fork()
                    .join()
                    .multiply(new ForkFactorialTask(middle + 1, end, threshold).fork().join()));
        }

        return factorial.multiply(factorial(BigInteger.valueOf(start), BigInteger.valueOf(end)));
    }
}

如果应用某个阈值,我们将细分我们的主要任务。然后我们可以使用invoke()方法来获取结果:

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); 
int result = forkJoinPool.invoke(forkFactorialTask);

此外,submit()或execute()也是一个选项。但是,我们总是需要join()命令来完成执行。

我们还创建一个测试,在其中执行阶乘子任务:

@Test
void givenForkExecutor_whenExecuteRecursiveTask_thenResultOk() {
    assertEquals(BigInteger.valueOf(3628800), forkExecutor.execute(new ForkFactorialTask(10, 5)));
}

在本例中,我们将10的阶乘分成两个任务。第一个将从1计算到5,而第二个将从6计算到10。

5. CompletableFuture

自1.8版本以来,Java通过引入CompletableFuture改进了多线程。它从Future执行中删除了样板代码,并添加了链接或组合异步结果等功能。但是,最重要的是,我们现在可以对任何方法进行异步计算,因此我们不受Callable的约束。此外,我们可以将语义不同的多个Future链接在一起。

5.1 supplyAsync()

使用CompletableFuture可以很简单:

CompletableFuture<BigInteger> future = CompletableFuture.supplyAsync(() -> factorial(BigInteger.valueOf(10)));
// ...
BigInteger result = future.get();

我们不再需要Callable,我们可以将任何Lambda表达式作为参数传递。让我们用supplyAsync()测试阶乘方法:

@Test
void givenCompletableFuture_whenSupplyAsyncFactorial_thenResultOk() throws ExecutionException, InterruptedException {
    CompletableFuture<BigInteger> completableFuture = CompletableFuture.supplyAsync(() -> factorial(BigInteger.valueOf(10)));
    assertEquals(BigInteger.valueOf(3628800), completableFuture.get());
}

请注意,我们没有指定任何线程池。在这种情况下,将使用默认的ForkJoinPool。但是,我们可以指定一个Executor,例如,使用固定线程池:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> factorial(BigInteger.valueOf(10)), Executors.newFixedThreadPool(1));

5.2 thenCompose()

我们还可以创建一个连续的Future链。假设我们有两个阶乘任务,第二个需要第一个任务的输入:

CompletableFuture<BigInteger> completableFuture = CompletableFuture.supplyAsync(() -> factorial(BigInteger.valueOf(3)))
    .thenCompose(inputFromFirstTask -> CompletableFuture.supplyAsync(() -> factorial(inputFromFirstTask)));

BigInteger result = completableFuture.get();

我们可以使用thenCompose()方法在链的下一个中使用来自CompletableFuture的返回值。

让我们组合两个阶乘的执行。例如,我们从3开始,给出6的阶乘作为下一个阶乘的输入:

@Test
void givenCompletableFuture_whenComposeTasks_thenResultOk() throws ExecutionException, InterruptedException {
    CompletableFuture<BigInteger> completableFuture = CompletableFuture.supplyAsync(() -> factorial(BigInteger.valueOf(3)))
        .thenCompose(inputFromFirstTask -> CompletableFuture.supplyAsync(() -> factorial(inputFromFirstTask)));
    assertEquals(BigInteger.valueOf(720), completableFuture.get());
}

5.3 allOf()

有趣的是,我们可以使用接收输入可变参数的静态方法allOf()并行执行多个Future。

从多个执行中收集异步结果就像添加到allOf()和join()来完成任务一样简单:

BigInteger result = allOf(asyncTask1, asyncTask2)
    .thenApplyAsync(fn -> factorial(factorialTask1.join()).add(factorial(new BigInteger(factorialTask2.join()))), Executors.newFixedThreadPool(1)).join();

请注意,allOf()的返回类型为void。因此,我们需要手动从单个Future中获取结果。此外,我们可以在同一执行中运行具有不同返回类型的Future。

为了进行测试,让我们加入两个不同的阶乘任务。为了演示,一个是数字输入,而第二个是字符串:

@Test
void givenCompletableFuture_whenAllOfTasks_thenResultOk() {
    CompletableFuture<BigInteger> asyncTask1 = CompletableFuture.supplyAsync(() -> BigInteger.valueOf(5));
    CompletableFuture<String> asyncTask2 = CompletableFuture.supplyAsync(() -> "3");

    BigInteger result = allOf(asyncTask1, asyncTask2)
        .thenApplyAsync(fn -> factorial(asyncTask1.join()).add(factorial(new BigInteger(asyncTask2.join()))), Executors.newFixedThreadPool(1))
            .join();

    assertEquals(BigInteger.valueOf(126), result);
}

6. 总结

在本教程中,我们了解了如何从线程返回对象。我们看到了如何将Callable与Future和线程池结合使用,Future包装结果并等待所有任务完成。我们还看到了ForkJoinPool的一个例子,可以将我们的执行优化为多个子任务。

Java 8中的CompletableFuture的工作方式类似,但还提供了新功能,例如可以执行任何Lambda表达式。它还允许我们链接和组合异步任务的结果。

最后,我们用Future、Fork和CompletableFuture测试了一个简单的阶乘任务。

与往常一样,本教程的完整源代码可在GitHub上获得。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章