在Java中同时启动两个线程

2023/06/07

1. 概述

多线程编程允许我们并发运行线程,每个线程可以处理不同的任务。因此,它可以最佳地利用资源,特别是当我们的计算机具有多个CPU或CPU有多个核心时。

有时,我们希望控制多个线程同时启动。

在本教程中,我们将首先说明我们的需求,尤其是“完全相同的时间”的含义。此外,我们还将讨论如何在Java中同时启动两个线程。

2. 了解需求

我们的需求是:“同时启动两个线程”。

这个要求看起来很容易理解。但是,如果我们仔细考虑一下,是否有可能完全同时启动两个线程?

首先,每个线程都会消耗CPU时间来工作。因此,如果我们的应用程序在具有单核CPU的计算机上运行,则不可能同时启动两个线程

如果我们的计算机具有多个CPU或多个CPU核心,两个线程可能会同时启动。但是,我们无法在Java中控制它。

这是因为当我们在Java中使用线程时,Java线程调度依赖于操作系统的线程调度。因此,不同的操作系统可能会以不同的方式处理它。

此外,根据爱因斯坦的狭义相对论,如果我们以更严格的方式讨论“完全相同的时间”:

从绝对意义上说,如果两个不同的事件在空间上是分开的,就不可能同时发生这些事件。

无论我们的CPU在主板上或CPU中的内核有多近,都有空间。因此,我们无法确保两个线程同时启动。

那么,这是否意味着该需求无效?

不,这是一个有效的要求。即使我们不能让两个线程完全同时启动,我们也可以通过一些同步技术非常接近。

在大多数实际情况下,当我们需要两个线程“同时”启动时,这些技术可能会对我们有所帮助。

在本教程中,我们将探讨解决此问题的几种方法:

所有方法都遵循相同的思想:我们不会真正同时启动两个线程。相反,我们在线程启动后立即阻塞线程,并尝试同时恢复它们的执行

由于我们的测试与线程调度相关,因此值得一提的是本教程中运行测试的环境:

  • CPU:11th Gen Intel(R) Core(TM) i5-1135G7。处理器时钟在2.6和4.3GHz之间(4.1 4核,4 GHz 6核)
  • 操作系统:内核版本为5.12.12的64位Linux
  • Java:Java 17

现在,让我们看看CountDownLatch和CyclicBarrier的实际应用。

3. 使用CountDownLatch类

CountDownLatch是Java 5中引入的一个同步器,作为java.util.concurrent包的一部分。通常,我们使用CountDownLatch来阻塞线程,直到其他线程完成它们的任务

简单地说,我们在latch对象中设置一个计数,然后将latch对象关联到一些线程上。当我们启动这些线程时,它们将被阻塞,直到latch的计数变为零。

另一方面,在其他线程中,我们可以控制在什么条件下减少计数并让阻塞的线程恢复,例如,当主线程中的某些任务完成时。

3.1 工作线程

现在,让我们看看如何使用CountDownLatch类解决我们的问题。

首先,我们创建一个线程类。让我们称之为WorkerWithCountDownLatch:

public class WorkerWithCountDownLatch extends Thread {
    private final CountDownLatch latch;

    public WorkerWithCountDownLatch(String name, CountDownLatch latch) {
        this.latch = latch;
        setName(name);
    }

    @Override
    public void run() {
        try {
            System.out.printf("[ %s ] created, blocked by the latch...\n", getName());
            latch.await();
            System.out.printf("[ %s ] starts at: %s\n", getName(), Instant.now());
            // do actual work here ...
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

我们在WorkerWithCountDownLatch类中添加了一个latch对象。首先,让我们来了解一下latch对象的功能。

在run()方法中,我们调用了latch.await()方法。这意味着,如果我们启动工作线程,它将检查latch的计数。线程将被阻塞,直到计数为零。

这样,我们就可以在主线程中创建一个count=1的CountDownLatch(1)锁存器,并将latch对象关联到我们要同时启动的两个工作线程。

当我们希望这两个线程继续执行它们的实际工作时,我们通过在主线程中调用latch.countDown()来释放latch。

接下来,我们来看看主线程是如何控制两个工作线程的。

3.2 主线程

我们将在usingCountDownLatch()方法中实现主线程的逻辑:

private static void usingCountDownLatch() throws InterruptedException {
    System.out.println("===============================================");
    System.out.println("        >>> Using CountDownLatch <<<<");
    System.out.println("===============================================");

    CountDownLatch latch = new CountDownLatch(1);

    WorkerWithCountDownLatch worker1 = new WorkerWithCountDownLatch("Worker with latch 1", latch);
    WorkerWithCountDownLatch worker2 = new WorkerWithCountDownLatch("Worker with latch 2", latch);

    worker1.start();
    worker2.start();

    Thread.sleep(10); // simulation of some actual work

    System.out.println("-----------------------------------------------");
    System.out.println(" Now release the latch:");
    System.out.println("-----------------------------------------------");
    latch.countDown();
}

现在,让我们从main()方法中调用上面的usingCountDownLatch()方法。当我们运行main()方法时,我们将看到输出:

===============================================
        >>> Using CountDownLatch <<<<
===============================================
[ Worker with latch 1 ] created, blocked by the latch
[ Worker with latch 2 ] created, blocked by the latch
-----------------------------------------------
 Now release the latch:
-----------------------------------------------
[ Worker with latch 2 ] starts at: 2021-06-27T16:00:52.268532035Z
[ Worker with latch 1 ] starts at: 2021-06-27T16:00:52.268533787Z

如上面的输出所示,两个工作线程几乎同时启动。两个开始时间之间的差异小于2微秒

4. 使用CyclicBarrier类

CyclicBarrier类是Java 5中引入的另一个同步器。本质上,CyclicBarrier允许固定数量的线程在继续执行之前相互等待达到一个公共点

接下来,让我们看看我们如何使用CyclicBarrier类来解决我们的问题。

4.1 工作线程

我们先来看看我们的工作线程的实现:

public class WorkerWithCyclicBarrier extends Thread {
    private final CyclicBarrier barrier;

    public WorkerWithCyclicBarrier(String name, CyclicBarrier barrier) {
        this.barrier = barrier;
        this.setName(name);
    }

    @Override
    public void run() {
        try {
            System.out.printf("[ %s ] created, blocked by the barrier\n", getName());
            barrier.await();
            System.out.printf("[ %s ] starts at: %s\n", getName(), Instant.now());
            // do actual work here...
        } catch (InterruptedException | BrokenBarrierException e) {
            Thread.currentThread().interrupt();
        }
    }
}

实现非常简单。我们将barrier对象与工作线程相关联。当线程启动时,我们立即调用barrier.await()方法。

这样,工作线程就会被阻塞等待各方调用barrier.await()来恢复。

4.2 主线程

接下来我们看看如何控制两个工作线程在主线程中恢复:

public class ThreadsStartAtSameTime {

    private static void usingCyclicBarrier() throws BrokenBarrierException, InterruptedException {
        System.out.println("\n===============================================");
        System.out.println("        >>> Using CyclicBarrier <<<<");
        System.out.println("===============================================");

        CyclicBarrier barrier = new CyclicBarrier(3);

        WorkerWithCyclicBarrier worker1 = new WorkerWithCyclicBarrier("Worker with barrier 1", barrier);
        WorkerWithCyclicBarrier worker2 = new WorkerWithCyclicBarrier("Worker with barrier 2", barrier);

        worker1.start();
        worker2.start();

        Thread.sleep(10); // simulation of some actual work

        System.out.println("-----------------------------------------------");
        System.out.println(" Now open the barrier:");
        System.out.println("-----------------------------------------------");
        barrier.await();
    }
}

我们的目标是让两个工作线程同时恢复。因此,算上主线程,我们总共有3个线程。

如上面的方法所示,我们在主线程中创建了一个包含3个parties(参与线程数)的屏障对象。接下来,我们创建并启动两个工作线程。

正如我们之前所讨论的,两个工作线程被阻塞并等待屏障的打开以恢复。

在主线程中,我们可以做一些实际的工作。当我们决定打开屏障时,我们调用方法barrier.await()让两个工作线程继续执行。

如果我们在main()方法中调用usingCyclicBarrier(),我们将得到输出:

===============================================
        >>> Using CyclicBarrier <<<<
===============================================
[ Worker with barrier 1 ] created, blocked by the barrier
[ Worker with barrier 2 ] created, blocked by the barrier
-----------------------------------------------
 Now open the barrier:
-----------------------------------------------
[ Worker with barrier 1 ] starts at: 2021-06-27T16:00:52.311346392Z
[ Worker with barrier 2 ] starts at: 2021-06-27T16:00:52.311348874Z

我们可以比较两个工作线程的开始时间。即使两个worker不是同时启动,但也非常接近我们的目标:两个启动时间之间的差异小于3微秒。

5. 使用Phaser类

Phaser类是Java 7中引入的同步器。它类似于CyclicBarrier和CountDownLatch。但是,Phaser类更灵活。

例如,与CyclicBarrier和CountDownLatch不同,Phaser允许我们动态注册参与线程数

接下来,让我们使用Phaser来解决我们的问题。

5.1 工作线程

像往常一样,我们先看一下实现,然后了解它是如何工作的:

public class WorkerWithPhaser extends Thread {
    private final Phaser phaser;

    public WorkerWithPhaser(String name, Phaser phaser) {
        this.phaser = phaser;
        phaser.register();
        setName(name);
    }

    @Override
    public void run() {
        try {
            System.out.printf("[ %s ] created, blocked by the phaser\n", getName());
            phaser.arriveAndAwaitAdvance();
            System.out.printf("[ %s ] starts at: %s\n", getName(), Instant.now());
            // do actual work here...
        } catch (IllegalStateException e) {
            Thread.currentThread().interrupt();
        }
    }
}

当工作线程被实例化时,我们通过调用phaser.register()将当前线程注册到给定的Phaser对象。这样,当前工作线程就成为Phaser的一个参与线程。

接下来,当工作线程启动时,我们立即调用phaser.arriveAndAwaitAdvance()。因此,我们告诉Phaser当前线程已经到达,并将等待其他参与线程的到达来继续进行。当然,在其他参与线程到达之前,当前线程就被阻塞了。

5.2 主线程

接下来我们继续看主线程的实现:

public class ThreadsStartAtSameTime {

    private static void usingPhaser() throws InterruptedException {
        System.out.println("\n===============================================");
        System.out.println("        >>> Using Phaser <<<");
        System.out.println("===============================================");

        Phaser phaser = new Phaser();
        phaser.register();

        WorkerWithPhaser worker1 = new WorkerWithPhaser("Worker with phaser 1", phaser);
        WorkerWithPhaser worker2 = new WorkerWithPhaser("Worker with phaser 2", phaser);

        worker1.start();
        worker2.start();

        Thread.sleep(10); // simulation of some actual work

        System.out.println("-----------------------------------------------");
        System.out.println(" Now open the phaser barrier:");
        System.out.println("-----------------------------------------------");
        phaser.arriveAndAwaitAdvance();
    }
}

在上面的代码中,我们可以看到,主线程也将自己注册为Phaser对象的参与线程

在我们创建并阻塞了两个工作线程之后,主线程也会调用phaser.arriveAndAwaitAdvance()。这样,我们打开了Phaser屏障,让两个工作线程可以同时恢复执行。

最后,让我们在main()方法中调用usingPhaser()方法:

===============================================
        >>> Using Phaser <<<
===============================================
[ Worker with phaser 1 ] created, blocked by the phaser
[ Worker with phaser 2 ] created, blocked by the phaser
-----------------------------------------------
 Now open the phaser barrier:
-----------------------------------------------
[ Worker with phaser 2 ] starts at: 2021-07-18T17:39:27.063523636Z
[ Worker with phaser 1 ] starts at: 2021-07-18T17:39:27.063523827Z

同样,两个工作线程几乎同时启动。两个开始时间之间的差异小于2微秒

6. 总结

在本文中,我们首先讨论了需求:“同时启动两个线程”。

接下来,我们讨论了同时启动三个线程的几种方法:使用CountDownLatch、CyclicBarrier和Phaser。

他们的思路很相似,阻塞两个线程并试图让它们同时恢复执行

在我的个人笔记本上,运行多次之后总是会输出相同的时间,这意味着它们确实是同时恢复执行的,这取决于具体的运行环境。即使不能做到完全同时,但对于现实世界的大多数情况来说,结果非常接近且足够。

与往常一样,本教程的完整源代码可在GitHub上获得。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章