java.util.concurrent概述

2023/06/07

1. 概述

java.util.concurrent包提供了创建用于并发应用程序的工具类。在本文中,我们将对这个包进行简要介绍。

2. 主要组件

java.util.concurrent包含的功能太多,无法在一篇文章中讲解。在本文中,我们将主要关注该包中一些最有用的类,如:

  • Executor
  • ExecutorService
  • ScheduledExecutorService
  • Future
  • CountDownLatch
  • CyclicBarrier
  • Semaphore
  • ThreadFactory
  • BlockingQueue
  • DelayQueue
  • Locks
  • Phaser

2.1 Executor

Executor是一个接口,表示执行所提供任务的对象

如果任务应该在新线程或当前线程上运行,这取决于特定的实现(从哪里发起调用)。因此,使用这个接口,我们可以将任务执行流程与实际的任务执行机制解耦。

这里需要注意的一点是,Executor并不严格要求任务执行是异步的。在最简单的情况下,executor可以在调用线程中立即调用提交的任务。

我们需要创建一个Invoker来创建Executor实例:

public class Invoker implements Executor {

    @Override
    public void execute(Runnable r) {
        r.run();
    }
}

现在,我们可以使用这个invoker来执行任务。

public class ExecutorDemo {
    public void execute() {
        Executor executor = new Invoker();
        executor.execute(() -> {
            // task to be performed ...
        });
    }
}

这里需要注意的一点是,如果invoker不能接受任务执行,它将抛出RejectedExecutionException

2.2 ExecutorService

ExecutorService是异步处理的完整解决方案。它管理内存队列,并根据可用的线程安排提交的任务调度。

要使用ExecutorService,我们需要创建一个实现Runnable的类。

public class Task implements Runnable {
    @Override
    public void run() {
        // task details ...
    }
}

现在我们可以创建ExecutorService实例并分配这个任务。在创建时,我们需要指定线程池的大小。

ExecutorService executor = Executors.newFixedThreadPool(10);

如果我们想创建一个单线程的ExecutorService实例,可以使用newSingleThreadExecutor(ThreadFactory threadFactory)创建。

一旦创建了executor,我们就可以使用它来提交任务。

public void execute() { 
    executor.submit(new Task()); 
}

我们也可以在提交任务的同时创建Runnable实例:

executor.submit(() -> {
    new Task();
});

它还提供了两个开箱即用的执行终止方法。第一个是shutdown();它将等待所有提交的任务完成执行。另一个方法是shutdownNow(),它会立即终止所有正在执行的任务并停止等待任务的处理。

还有另一种方法awaitTermination(long timeout, TimeUnit unit),它强制阻塞,直到所有任务在触发关闭事件或发生执行超时后完成执行,或者执行线程本身被中断。

try {
    executor.awaitTermination(20L, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
    e.printStackTrace();
}

2.3 ScheduledExecutorService

ScheduleExecutorService与ExecutorService类似,但它可以定期执行任务。

Executor和ExecutorService的方法立即安排执行,不会引入任何人为延迟。零或任何负值表示请求需要立即执行。

我们可以使用Runnable和Callable接口来定义任务。

public class ScheduledExecutorServiceDemo {

    public void execute() {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

        Future<String> future = executorService.schedule(() -> {
            // ...
            return "Hello world";
        }, 1, TimeUnit.SECONDS);

        ScheduledFuture<?> scheduledFuture = executorService.schedule(() -> {
            // ...
        }, 1, TimeUnit.SECONDS);

        executorService.shutdown();
    }
}

ScheduledExecutorService也可以在给定的固定延迟后调度任务:

executorService.scheduleAtFixedRate(() -> {
    // ...
}, 1, 10, TimeUnit.SECONDS);

executorService.scheduleWithFixedDelay(() -> {
    // ...
}, 1, 10, TimeUnit.SECONDS);

这里,scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)方法创建并执行一个周期性任务,该任务在提供的initialDelay之后首先被调用,然后在给定的period内被调用,直到executorService关闭。

scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)方法创建并执行一个周期性任务,该任务在提供的initialDelay后首先被调用,并在执行任务终止和调用下一个任务之间以给定的delay重复调用。

2.4 Future

Future用于表示异步操作的结果。它提供了检查异步操作是否完成、获取计算结果等方法。

更重要的是,cancel(boolean mayInterruptIfRunning)方法会取消任务并释放正在执行的线程。如果mayInterruptIfRunning的值为true,则执行任务的线程将立即终止。

否则,将允许完成正在进行的任务。

我们可以使用下面的代码片段来创建Future的实例:

public class FutureDemo {

    public void invoke() {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        Future<String> future = executorService.submit(() -> {
            // ...
            Thread.sleep(10000L);
            return "Hello world";
        });
    }
}

我们可以使用以下代码片段来检查Future的结果是否准备就绪,并在计算完成后获取数据:

if (future.isDone() && !future.isCancelled()) {
    try {
        str = future.get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

我们还可以为给定的任务指定超时。如果任务花费的时间超过此时间,则会抛出TimeoutException:

try {
    future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
    e.printStackTrace();
}

2.5 CountDownLatch

CountDownLatch(在JDK 5中引入)是一个同步器类,它阻塞一组线程,直到某个操作完成。

CountDownLatch使用counter(Integer type)初始化;随着相关线程完成执行,该计数器递减。一旦计数器达到零,其他线程就会被释放。

你可以在此处了解有关CountDownLatch的更多信息。

2.6 CyclicBarrier

CyclicBarrier的工作原理与CountDownLatch几乎相同,只是我们可以重用它。与CountDownLatch不同的是,它允许多个线程在调用最终任务之前使用await()方法(称为屏障条件)互相等待。

我们需要创建一个Runnable的任务实例来启动屏障条件:

public class Task implements Runnable {

    private final CyclicBarrier barrier;

    public Task(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    public void run() {
        try {
            System.out.println("Thread : " + Thread.currentThread().getName() + " is waiting");
            barrier.await();
            System.out.println("Thread : " + Thread.currentThread().getName() + " is released");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

现在我们可以调用一些线程来竞争屏障条件:

public class CyclicBarrierExample {

    public void start() {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
            // Task
            System.out.println("All previous tasks are completed");
        });

        Thread t1 = new Thread(new Task(cyclicBarrier), "T1");
        Thread t2 = new Thread(new Task(cyclicBarrier), "T2");
        Thread t3 = new Thread(new Task(cyclicBarrier), "T3");

        if (!cyclicBarrier.isBroken()) {
            t1.start();
            t2.start();
            t3.start();
        }
    }
}

这里,isBroken()方法检查执行期间是否有任何线程被中断。在执行实际流程之前,我们应该始终执行此检查。

2.7 Semaphore

信号量Semaphore用于阻塞线程对物理或逻辑资源的某些部分的访问。一个信号量包含一组许可证;每当一个线程试图进入临界区时,它都需要检查信号量是否有许可证可用。

如果许可证不可用(通过tryAcquire()检查),则不允许线程进入临界区;但是,如果许可证可用,则会授予访问权限,并且许可证计数器会减1

一旦执行线程退出临界区,许可证计数器再次加1(通过release()方法完成)。

我们可以使用tryAcquire(long timeout, TimeUnit unit)方法指定获取访问权限的超时时间。

我们还可以检查可用许可证的数量或等待获取信号量的线程的数量

以下代码片段可用于实现信号量:

public class SemaphoreDemo {
    static Semaphore semaphore = new Semaphore(10);

    public void execute() throws InterruptedException {

        System.out.println("Available permit : " + semaphore.availablePermits());
        System.out.println("Number of threads waiting to acquire: " + semaphore.getQueueLength());

        if (semaphore.tryAcquire()) {
            try {
                // perform some critical operations
            } finally {
                semaphore.release();
            }
        }
    }
}

我们可以使用Semaphore实现类似Mutex(互斥)的数据结构,可以在此处找到有关此内容的更多详细信息。

2.8 ThreadFactory

顾名思义,ThreadFactory充当一个线程(不存在)池,根据需要创建一个新线程。它消除了实现高效线程创建机制所需的大量样板代码。

我们可以定义一个ThreadFactory:

public class TuyuchengThreadFactory implements ThreadFactory {
    private int threadId;
    private final String name;

    public TuyuchengThreadFactory(String name) {
        threadId = 1;
        this.name = name;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r, name + "-Thread_" + threadId);
        System.out.println("created new thread with id : " + threadId + " and name : " + t.getName());
        threadId++;
        return t;
    }
}

我们可以使用这个newThread(Runnable r)方法在运行时创建一个新线程:

public class Demo {

    public void execute() {
        TuyuchengThreadFactory factory = new TuyuchengThreadFactory("TuyuchengThreadFactory");
        for (int i = 0; i < 10; i++) {
            Thread t = factory.newThread(new Task());
            t.start();
        }
    }
}

2.9 BlockingQueue

在异步编程中,最常见的集成模式之一是生产者-消费者模式。java.util.concurrent包中包含一个名为BlockingQueue的数据结构,在这些异步场景中非常有用。

有关这方面的更多信息,请参见此处

2.10 DelayQueue

DelayQueue是一个无限大小元素的阻塞队列,其中一个元素只有在其过期时间(称为用户定义的延迟)完成时才能被pull。因此,最顶端的元素(head)将具有最大的延迟量,并且它将最后被轮询。

此处提供了有关此的更多信息和工作示例。

2.11 Lock

Lock是一个用于阻止其他线程(除了当前正在执行该代码段的线程)访问特定代码段的工具。

Lock和synchronized之间的主要区别在于,同步块完全包含在单个方法中;但是,我们可以在不同的方法中使用Lock API的lock()和unlock()方法。

有关这方面的更多信息,请参见此处

2.12 Phaser

Phaser是一种比CyclicBarrier和CountDownLatch更灵活的解决方案,用于充当一个可重用的屏障,在继续执行之前需要等待动态数量的线程。我们可以协调多个执行阶段,为每个程序阶段重用一个Phaser实例。

有关这方面的更多信息,请参见此处

3. 总结

在这篇文章中,我们重点介绍了java.util.concurrent包中可用的不同并发工具类。

与往常一样,本教程的完整源代码可在GitHub上获得。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章