Java中的并行化for循环

2023/07/22

1. 概述

有时,我们可能需要在for循环中处理大量元素。按顺序执行此操作可能会花费大量时间,并使系统无法得到充分利用。

在本教程中,我们将学习在Java中并行化for循环的不同方法,以提高此类情况下应用程序的性能。

2. 顺序处理

让我们首先看看如何在for循环中顺序处理元素并测量处理元素所需的时间。

2.1 使用for循环进行顺序处理

首先,我们将创建一个运行100次的for循环,并在每次迭代中执行繁重操作。

繁重操作的常见示例包括数据库调用、网络调用或CPU密集型操作。为了模拟繁重操作所花费的时间,让我们在每次迭代中调用Thread.sleep()方法:

public class Processor {

    public void processSerially() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            Thread.sleep(10);
        }
    }
}

在上面的代码中,我们在每次迭代中调用Thread.sleep()方法,这会导致执行暂停10毫秒。当我们运行processSerially()方法时,需要花费大量时间来顺序处理元素。

在接下来的部分中,我们将通过并行化for循环来优化此方法。最后,我们将比较顺序处理和并行处理所花费的时间。

3. 使用ExecutorService进行并行处理

ExecutorService是一个代表异步执行机制的接口,它允许我们提交要执行的任务并提供管理它们的方法

让我们看看如何使用ExecutorService接口来并行化for循环:

void processParallelyWithExecutorService() throws InterruptedException {
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    List<CompletableFuture<Void>> futures = new ArrayList<>();
    for (int i = 0; i < 100; i++) {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, executorService);
        futures.add(future);
    }
    CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
    executorService.shutdown();
}

上面的代码中有几点需要注意:

  • 我们使用newFixedThreadPool()方法创建一个包含10个线程的线程池
  • 接下来,我们使用CompletableFuture.runAsync()方法将任务提交到线程池,runAsync()方法确保提供给它的任务在单独的线程中异步运行。
  • 该方法接收Callable或Runnable对象作为参数。在本例中,我们使用lambda表达式创建一个Runnable对象。
  • runAsync()方法返回一个CompletableFuture对象,我们将其添加到CompletableFuture对象列表中,以便稍后使用executorService实例中的线程池执行
  • 接下来,我们使用CompletableFuture.allOf()方法组合CompletableFuture对象,并对它们调用join()操作。执行join()时,进程会等待所有CompletableFuture任务并行完成
  • 最后,我们使用shutdown()方法关闭ExecutorService,该方法释放线程池中的所有线程。

4. 使用流进行并行处理

Java 8引入了Stream API,它支持并行处理。让我们探讨一下Stream API如何并行化for循环。

4.1 使用并行流

让我们看看如何使用Stream API的parallel()方法来并行化for循环:

void processParallelyWithStream() {
    IntStream.range(0, 100)
        .parallel()
        .forEach(i -> {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
}

在上面的代码中,我们使用IntStream.range()方法创建一个整数流。接下来,我们调用parallel()方法来并行化流。

最后,我们调用forEach()方法来处理流的元素。对于每个元素,我们调用Thread.sleep()方法来模拟繁重的操作。

4.2 使用StreamSupport

并行化for循环的另一种方法是使用StreamSupport类,让我们看一下相同的代码:

void processParallelyWithStreamSupport() {
    Iterable<Integer> iterable = () -> IntStream.range(0, 100).iterator();
    Stream<Integer> stream = StreamSupport.stream(iterable.spliterator(), true);
    stream.forEach(i -> {
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}

StreamSupport类提供了一个stream()方法,该方法接收Iterable对象作为参数。此外,它还需要一个布尔参数来指示流是否应该并行。

在这里,我们使用IntStream.range()方法创建一个Iterable对象。接下来,我们调用stream()方法来创建整数流。最后,我们调用forEach()方法来处理流的元素。

parallel()方法和StreamSupport类的工作方式类似,它们在内部创建线程来处理流的元素,创建的线程数量取决于系统中可用的核心数量

5. 性能比较

现在我们已经了解了并行化for循环的不同方法,让我们比较一下每种方法的性能。为此,我们使用Java Microbenchmark Harness(JMH)。首先,我们需要将JMH依赖项添加到我们的项目中。

接下来,让我们将@BenchmarkMode注解添加到我们的方法中,并使它们能够针对平均时间进行基准测试:

@Benchmark
@BenchmarkMode(Mode.AverageTime)
public void processSerially() throws InterruptedException {
    for (int i = 0; i < 100; i++) {
        Thread.sleep(10);
    }
}

同样,让我们对所有并行处理方法执行相同的操作。

为了运行基准测试,我们创建一个main()方法并设置JMH:

class Benchmark {

    public static void main(String[] args) {
        try {
            org.openjdk.jmh.Main.main(new String[] { "cn.tuyucheng.taketoday.concurrent.parallel.Processor" });
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

从main()方法中,我们调用JMH的main()方法并将路径作为参数传递给Processor类。这告诉JMH对Processor类的方法运行基准测试。

当我们运行main()方法时,我们会看到以下结果:

从上面的结果可以看出,并行处理元素所花费的时间比顺序处理它们所花费的时间要少得多。

值得注意的是,处理元素所花费的时间可能因系统而异,这取决于系统中可用的核心数量。

此外,每种并行方法每次运行所花费的时间可能会有所不同,并且这些数字并不是这些方法之间的精确比较

6. 总结

在本文中,我们研究了在Java中并行化for循环的不同方法。我们探讨了如何使用ExecutorService接口、Stream API和StreamSupport实用程序来并行化for循环。最后,我们使用JMH比较了每种方法的性能。

与往常一样,示例代码可以在GitHub上找到。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章