Java 8并行流中的自定义线程池

2023/06/07

1. 概述

Java 8引入了Stream的概念,作为对数据执行批量操作的一种有效方式,并且可以在支持并发的环境中获取并行流。

这些流可以带来更高的性能,但要以多线程开销为代价。

在这个快速教程中,我们将了解Stream API的最大限制之一,并了解如何使并行流与自定义ThreadPool实例一起工作,或者,有一个库可以处理这个问题

2. 并行流

让我们从一个简单的例子开始-在任何Collection类型上调用parallelStream方法都将返回一个可能并行的Stream:

@Test
void givenList_whenCallingParallelStream_shouldBeParallelStream() {
    List<Long> aList = new ArrayList<>();
    Stream<Long> parallelStream = aList.parallelStream();

    assertTrue(parallelStream.isParallel());
}

在这种流中发生的处理默认使用ForkJoinPool.commonPool(),这是一个由整个应用程序共享的线程池

3. 自定义线程池

我们实际上可以在处理流时传递一个自定义的ThreadPool

以下示例让并行流使用自定义ThreadPool来计算从1到100万的long值之和:

@Test
void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal() throws InterruptedException, ExecutionException {
    long firstNum = 1;
    long lastNum = 1_000_000;

    List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
        .collect(Collectors.toList());

    ForkJoinPool customThreadPool = new ForkJoinPool(4);
    long actualTotal = customThreadPool.submit(
        () -> aList.parallelStream().reduce(0L, Long::sum)).get();

    assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
}

我们使用并行级别为4的ForkJoinPool构造函数。需要进行一些实验来确定不同环境的最佳值,但一个好的经验法则是根据CPU的内核数来选择数量。

接下来,我们处理并行流的内容,并在reduce调用中将其相加。

这个简单的示例可能无法演示使用自定义线程池的全部用途,但在我们不想将公共线程池与长时间运行的任务(例如处理来自网络源的数据)捆绑在一起,或者公共线程池正被应用程序中的其他组件使用的情况下,其好处就显而易见了。

如果我们运行上面的测试方法,它就会通过。到目前为止,一切都很好。

但是,如果我们像在测试方法中一样在普通方法中实例化ForkJoinPool类,则可能会导致OutOfMemoryError。

接下来,让我们仔细看看内存泄漏的原因。

4. 谨防内存泄漏

正如我们之前谈到的,公共线程池默认情况下被整个应用程序使用。公共线程池是一个静态的ThreadPool实例

因此,如果我们使用默认线程池,则不会发生内存泄漏。

现在,让我们回顾一下我们的测试方法。在测试方法中,我们创建了一个ForkJoinPool对象,当测试方法完成时,customThreadPool对象不会被取消引用和垃圾回收-相反,它将等待分配新任务

也就是说,我们每次调用测试方法时,都会创建一个新的customThreadPool对象,并且不会被释放。

问题的解决方法非常简单:即在我们执行完方法后关闭customThreadPool对象:

try {
    long actualTotal = customThreadPool.submit(
        () -> aList.parallelStream().reduce(0L, Long::sum)).get();
    assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
} finally {
    customThreadPool.shutdown();
}

5. 总结

我们已经简要介绍了如何使用自定义ThreadPool运行并行流。在正确的环境中,通过正确使用并行级别,在某些情况下可以获得性能提升。

如果我们创建一个自定义的ThreadPool,我们应该记住调用它的shutdown()方法以避免内存泄漏。

与往常一样,本教程的完整源代码可在GitHub上获得。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章