使用并行收集器和虚拟线程进行并行收集处理

2025/03/30

1. 简介

上一篇文章中,我们介绍了parallel-collectors,这是一个小型零依赖库,可以在自定义线程池上实现Stream API的并行处理。

Project Loom是将轻量级虚拟线程(以前称为Fibers)引入JVM的组织工作的代号,该工作已在JDK 21中完成。

让我们看看如何在并行收集器中利用这一点。

2. Maven依赖

如果我们想要开始使用该库,我们需要在Maven的pom.xml文件中添加一个条目:

<dependency>
    <groupId>com.pivovarit</groupId>
    <artifactId>parallel-collectors</artifactId>
    <version>3.0.0</version>
</dependency>

或者Gradle构建文件中的一行:

compile 'com.pivovarit:parallel-collectors:3.0.0'

最新版本可以在Maven Central上找到。

3. 使用操作系统线程与虚拟线程进行并行处理

3.1 操作系统线程并行性

让我们看看为什么使用虚拟线程进行并行处理是一个重点。

我们先创建一个简单的例子,我们需要一个并行操作,即人为延迟的字符串拼接:

private static String fetchById(int id) {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        // ignore shamelessly
    }
    return "user-" + id;
}

我们还将使用自定义代码来测量执行时间:

private static <T> T timed(Supplier<T> supplier) {
    var before = Instant.now();
    T result = supplier.get();
    var after = Instant.now();
    log.info("Execution time: {} ms", Duration.between(before, after).toMillis());
    return result;
}

现在,让我们创建一个简单的并行流处理示例,其中我们创建n个元素,然后在n个线程上以n的并行度处理它们:

@Test
public void processInParallelOnOSThreads() {
    int parallelProcesses = 5_000;
    var e = Executors.newFixedThreadPool(parallelProcesses);

    var result = timed(() -> Stream.iterate(0, i -> i + 1).limit(parallelProcesses)
            .collect(ParallelCollectors.parallel(i -> fetchById(i), toList(), e, parallelProcesses))
            .join());

    log.info("{}", result);
}

当我们运行它时,我们可以观察到它显然完成了工作,因为我们不需要等待5000秒才能获得结果:

Execution time: 1321 ms
[user-0, user-1, user-2, ...]

但是让我们看看如果我们尝试将并行处理的元素数量增加到20000会发生什么:

[2.795s][warning][os,thread] Failed to start thread "Unknown thread" - pthread_create failed (...)
[2.795s][warning][os,thread] Failed to start the native thread for java.lang.Thread "pool-1-thread-16111"

基于操作系统线程的方法无法扩展,因为创建线程的成本很高,而且我们很快就会达到资源限制

让我们看看如果切换到虚拟线程会发生什么。

3.2 虚拟线程并行

在Java 21之前,很难为线程池配置提供合理的默认值。幸运的是,虚拟线程不需要任何默认值-我们可以根据需要创建任意数量的线程,并且它们在共享的ForkJoinPool实例上进行内部调度,这使得它们非常适合运行阻塞操作。

如果我们运行的是Parallel Collectors 3.x,我们可以毫不费力地利用虚拟线程:

@Test
public void processInParallelOnVirtualThreads() {
    int parallelProcesses = 5_000;

    var result = timed(() -> Stream.iterate(0, i -> i + 1).limit(parallelProcesses)
            .collect(ParallelCollectors.parallel(i -> fetchById(i), toList()))
            .join());
}

我们可以看到,这就像省略执行器和并行参数一样简单,因为虚拟线程是默认的执行实用程序。

如果我们尝试运行它,我们可以看到它实际上比原始示例完成得更快:

Execution time: 1101 ms
[user-0, user-1, user-2, ...]

这是因为我们创建了5000个虚拟线程,这些线程是使用一组非常有限的操作系统线程进行调度的。

让我们尝试将并行度增加到20000,这在传统的Executor中是不可能的:

Execution time: 1219 ms
[user-0, user-1, user-2, ...]

这不仅成功执行,而且比OS线程上小4倍的作业完成得更快。

我们将并行度增加到100000并看看会发生什么:

Execution time: 1587 ms
[user-0, user-1, user-2, ...]

尽管观察到相当大的开销,但工作正常。

如果我们将并行度提高到1000000会怎么样?

Execution time: 6416 ms
[user-0, user-1, user-2, ...]

2000000?

Execution time: 12906 ms
[user-0, user-1, user-2, ...]

5000000?

Execution time: 25952 ms
[user-0, user-1, user-2, ...]

可以看到,我们可以轻松扩展到使用OS线程无法实现的高水平并行性。除了在较小的并行工作负载上的性能改进外,这也是利用虚拟线程并行处理阻塞操作的主要优势。

3.3 虚拟线程和旧版本的Parallel Collectors

利用虚拟线程的最简单方法是升级到库的最新版本,但如果这不可能,我们也可以在JDK 21上运行时使用2.xy版本来实现这一点。

诀窍是手动提供Executors.newVirtualThreadPerTaskExecutor()作为执行器,并提供Integer.MAX_VALUE作为最大并行级别:

@Test
public void processInParallelOnVirtualThreadsParallelCollectors2() {
    int parallelProcesses = 100_000;

    var result = timed(() -> Stream.iterate(0, i -> i + 1).limit(parallelProcesses)
            .collect(ParallelCollectors.parallel(
                    i -> fetchById(i), toList(),
                    Executors.newVirtualThreadPerTaskExecutor(), Integer.MAX_VALUE))
            .join());

    log.info("{}", result);
}

4. 总结

在本文中,我们了解了如何轻松地利用Parallel Collectors库中的虚拟线程,事实证明,其扩展性比传统的基于操作系统线程的解决方案要好得多。我们的测试机器最终在大约16000个线程时达到资源限制,而轻松扩展到数百万个虚拟线程是可能的。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章