Java中的CyclicBarrier

2023/06/07

1. 概述

CyclicBarriers是作为java.util.concurrent包的一部分随Java 5引入的同步结构。

在本文中,我们将在并发场景中探讨此实现。

2. Java并发-同步器

java.util.concurrent包含一些有助于管理一组相互协作的线程的类。其中包括:

  • CyclicBarrier
  • Phaser
  • CountDownLatch
  • Exchanger
  • Semaphore
  • SynchronousQueue

这些类为线程之间的常见交互模式提供了开箱即用的功能。

如果我们有一组相互通信的线程,它们类似于一种常见的管理线程协作的模式,那么我们可以简单地重用适当的类(也称为同步器),而不是尝试使用一组锁和Condition对象以及synchronized关键字来创建自定义方案

让我们继续关注CyclicBarrier。

3. CyclicBarrier

CyclicBarrier是一个同步器,它允许一组线程相互等待到达一个共同的执行点,也称为屏障。

CyclicBarrier用于在程序中,我们有固定数量的线程,这些线程在继续执行之前必须相互等待到达公共点。

屏障之所以称为循环屏障,是因为它可以在等待线程被释放后重新使用

4. 用法

CyclicBarrier的构造函数很简单。它接收单个整数值表示需要在CyclicBarrier实例上调用await()方法以表示达到公共执行点的线程数:

public CyclicBarrier(int parties)

需要同步执行的线程也被称为parties(参与方),调用await()方法是我们注册某个线程已到达屏障点的方式。

此调用是同步的,调用此方法的线程会暂停执行,直到指定数量的线程在屏障上调用了相同的方法。在这种情况下,所需数量的线程调用了await(),称为触发屏障

或者,我们可以将第二个参数传递给构造函数,它是一个Runnable实例。它的逻辑将由最后一个到达屏障的线程运行:

public CyclicBarrier(int parties, Runnable barrierAction)

5. 实现

要了解CyclicBarrier的实际应用,让我们考虑以下场景:

有一个操作由固定数量的线程执行并将相应的结果存储在列表中。当所有线程完成它们的操作时,其中一个线程(通常是最后一个触发屏障的线程)开始处理每个线程获取的数据。

让我们实现所有操作发生的主类:

public class CyclicBarrierDemo {
    private CyclicBarrier cyclicBarrier;
    private final List<List<Integer>> partialResults = Collections.synchronizedList(new ArrayList<>());
    private final Random random = new Random();
    private int NUM_PARTIAL_RESULTS;
    private int NUM_WORKERS;
    // ...
}

这个类非常简单-NUM_WORKERS是将要执行的线程数,NUM_PARTIAL_RESULTS是每个工作线程将要生成的结果数。

最后,我们有一个partialResults列表,它将存储每个工作线程的结果。请注意,此列表是一个SynchronizedList,因为多个线程将同时向其写入数据,而add()方法在普通的ArrayList上不是线程安全的。

现在让我们实现每个工作线程的逻辑:

public class CyclicBarrierDemo {
    // ...

    class NumberCruncherThread implements Runnable {

        @Override
        public void run() {
            String thisThreadName = currentThread().getName();
            List<Integer> partialResult = new ArrayList<>();

            // Crunch some numbers and store the partial result
            for (int i = 0; i < NUM_PARTIAL_RESULTS; i++) {
                int num = random.nextInt(10);
                System.out.println(thisThreadName + ": Crunching some numbers! Final result - " + num);
                partialResult.add(num);
            }
            partialResults.add(partialResult);
            try {
                System.out.println(thisThreadName + " waiting for others to reach barrier");
                cyclicBarrier.await();
            } catch (BrokenBarrierException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

现在,我们将实现当屏障被触发时运行的逻辑。

为了简单起见,我们将partialResults列表中的所有数字相加:

public class CyclicBarrierDemo {

    // ...

    class AggregatorThread implements Runnable {

        @Override
        public void run() {
            String thisThreadName = currentThread().getName();
            System.out.println(thisThreadName + ": Computing final sum of " + NUM_WORKERS + " workers, having " + NUM_PARTIAL_RESULTS + " results each.");
            int sum = 0;
            
            for (List<Integer> threadResult : partialResults) {
                System.out.print("Adding ");
                for (Integer partialResult : threadResult) {
                    System.out.print(partialResult + " ");
                    sum += partialResult;
                }
                System.out.println();
            }
            System.out.println(Thread.currentThread().getName() + ": Final result = " + sum);
        }
    }
}

最后一步是构造CyclicBarrier并使用main()方法启动:

public class CyclicBarrierDemo {

    // Previous code

    public static void main(String[] args) {
        CyclicBarrierDemo play = new CyclicBarrierDemo();
        play.runSimulation(5, 3);
    }

    private void runSimulation(int numWorkers, int numberOfPartialResults) {
        NUM_PARTIAL_RESULTS = numberOfPartialResults;
        NUM_WORKERS = numWorkers;
        
        cyclicBarrier = new CyclicBarrier(NUM_WORKERS, new AggregatorThread());
        
        System.out.println("Spawning " + NUM_WORKERS + " worker threads to compute " + NUM_PARTIAL_RESULTS + " partial results each");
        for (int i = 0; i < NUM_WORKERS; i++) {
            Thread worker = new Thread(new NumberCruncherThread());
            worker.start();
        }
    }
}

在上面的代码中,我们用5个线程初始化CyclicBarrier,每个线程生成3个整数作为它们计算的一部分,并将其存储在partialResult列表中。

一旦屏障被触发,触发屏障的最后一个线程将执行AggregatorThread中指定的逻辑,即将线程生成的所有数字相加。

6. 输出结果

这是上述程序的一次执行的输出,每次执行可能会产生不同的结果,因为线程可以以不同的顺序生成:

Spawning 5 worker threads to compute 3 partial results each
Thread 3: Crunching some numbers! Final result - 3
Thread 3: Crunching some numbers! Final result - 2
Thread 2: Crunching some numbers! Final result - 4
Thread 2: Crunching some numbers! Final result - 1
Thread 2: Crunching some numbers! Final result - 3
Thread 0: Crunching some numbers! Final result - 4
Thread 0: Crunching some numbers! Final result - 6
Thread 0: Crunching some numbers! Final result - 3
Thread 4: Crunching some numbers! Final result - 1
Thread 3: Crunching some numbers! Final result - 2
Thread 4: Crunching some numbers! Final result - 0
Thread 0 waiting for others to reach barrier.
Thread 2 waiting for others to reach barrier.
Thread 3 waiting for others to reach barrier.
Thread 1: Crunching some numbers! Final result - 0
Thread 1: Crunching some numbers! Final result - 1
Thread 1: Crunching some numbers! Final result - 9
Thread 1 waiting for others to reach barrier.
Thread 4: Crunching some numbers! Final result - 0
Thread 4 waiting for others to reach barrier.
Thread 4: Computing final sum of 5 worker, having 3 results each.
4 1 3 
4 6 3 
3 2 2 
0 1 9 
1 0 0 
Thread 4: Final result = 39

如上面的输出所示,线程4是触发屏障并执行最终聚合逻辑的线程。线程实际上也没有必要按照上面示例所示的启动顺序运行。

7. 总结

在本文中,我们了解了CyclicBarrier是什么,以及它在什么样的情况下有用。

我们还实现了一个场景,在继续其他程序逻辑之前,我们需要固定数量的线程才能到达固定的执行点。

与往常一样,本教程的完整源代码可在GitHub上获得。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章