Java 中的并发包指的是 java.util.concurrent(简称 JUC)包和其子包下的类和接口,它为 Java 的并发提供了各种功能支持,比如:

  • 提供了线程池的创建类 ThreadPoolExecutor、Executors 等;
  • 提供了各种锁,如 Lock、ReentrantLock 等;
  • 提供了各种线程安全的数据结构,如 ConcurrentHashMap、LinkedBlockingQueue、DelayQueue 等;
  • 提供了更加高级的线程同步结构,如 CountDownLatch、CyclicBarrier、Semaphore 等。

在前面的章节中我们已经详细地介绍了线程池的使用、线程安全的数据结构等,本文我们就重点学习一下 Java 并发包中更高级的线程同步类:CountDownLatch、CyclicBarrier、Semaphore 和 Phaser 等。

CountDownLatch 介绍和使用

CountDownLatch(闭锁)可以看作一个只能做减法的计数器,可以让一个或多个线程等待执行。
CountDownLatch 有两个重要的方法:

  • countDown():使计数器减 1;
  • await():当计数器不为 0 时,则调用该方法的线程阻塞,当计数器为 0 时,可以唤醒等待的一个或者全部线程。

CountDownLatch 使用场景:

以生活中的情景为例,比如去医院体检,通常人们会提前去医院排队,但只有等到医生开始上班,才能正式开始体检,医生也要给所有人体检完才能下班,这种情况就要使用 CountDownLatch,流程为:患者排队 → 医生上班 → 体检完成 → 医生下班。

CountDownLatch 示例代码如下:

// 医院闭锁
CountDownLatch hospitalLatch = new CountDownLatch(1);
// 患者闭锁
CountDownLatch patientLatch = new CountDownLatch(5);
System.out.println("患者排队");
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
    final int j = i;
    executorService.execute(() -> {
        try {
            hospitalLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("体检:" + j);
        patientLatch.countDown();
    });
}
System.out.println("医生上班");
hospitalLatch.countDown();
patientLatch.await();
System.out.println("医生下班");
executorService.shutdown();

以上程序执行结果如下:

患者排队

医生上班

体检:4

体检:0

体检:1

体检:3

体检:2

医生下班

执行流程如下图:

7871628265914_.pic.jpg

CyclicBarrier 介绍和使用

CyclicBarrier(循环屏障)通过它可以实现让一组线程等待满足某个条件后同时执行。

CyclicBarrier 经典使用场景是公交发车,为了简化理解我们这里定义,每辆公交车只要上满 4 个人就发车,后面来的人都会排队依次遵循相应的标准。

它的构造方法为 CyclicBarrier(int parties,Runnable barrierAction) 其中,parties 表示有几个线程来参与等待,barrierAction 表示满足条件之后触发的方法。CyclicBarrier 使用 await() 方法来标识当前线程已到达屏障点,然后被阻塞。

CyclicBarrier 示例代码如下:

import java.util.concurrent.*;
public class CyclicBarrierTest {
    public static void main(String[] args) throws InterruptedException {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {
            @Override
            public void run() {
                System.out.println("发车了");
            }
        });
        for (int i = 0; i < 4; i++) {
            new Thread(new CyclicWorker(cyclicBarrier)).start();
        }
    }
    static class CyclicWorker implements Runnable {
        private CyclicBarrier cyclicBarrier;
        CyclicWorker(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run() {
            for (int i = 0; i < 2; i++) {
                System.out.println("乘客:" + i);
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

以上程序执行结果如下:


乘客:0

乘客:0

乘客:0

乘客:0

发车了

乘客:1

乘客:1

乘客:1

乘客:1

发车了

执行流程如下图:

7881628265926_.pic.jpg

Semaphore 介绍和使用

Semaphore(信号量)用于管理多线程中控制资源的访问与使用。Semaphore 就好比停车场的门卫,可以控制车位的使用资源。比如来了 5 辆车,只有 2 个车位,门卫可以先放两辆车进去,等有车出来之后,再让后面的车进入。

Semaphore 示例代码如下:

Semaphore semaphore = new Semaphore(2);
ThreadPoolExecutor semaphoreThread = new ThreadPoolExecutor(10, 50, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
for (int i = 0; i < 5; i++) {
    semaphoreThread.execute(() -> {
        try {
            // 堵塞获取许可
            semaphore.acquire();
            System.out.println("Thread:" + Thread.currentThread().getName() + " 时间:" + LocalDateTime.now());
            TimeUnit.SECONDS.sleep(2);
            // 释放许可
            semaphore.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}
复制
以上程序执行结果如下:

Thread:pool-1-thread-1 时间:2019-07-10 21:18:42

Thread:pool-1-thread-2 时间:2019-07-10 21:18:42

Thread:pool-1-thread-3 时间:2019-07-10 21:18:44

Thread:pool-1-thread-4 时间:2019-07-10 21:18:44

Thread:pool-1-thread-5 时间:2019-07-10 21:18:46

执行流程如下图:

7891628265937_.pic.jpg

Phaser 介绍和使用

Phaser(移相器)是 JDK 7 提供的,它的功能是等待所有线程到达之后,才继续或者开始进行新的一组任务。

比如有一个旅行团,我们规定所有成员必须都到达指定地点之后,才能发车去往景点一,到达景点之后可以各自游玩,之后必须全部到达指定地点之后,才能继续发车去往下一个景点,类似这种场景就非常适合使用 Phaser。

Phaser 示例代码如下:

public class Lesson5_6 {
    public static void main(String[] args) throws InterruptedException {
        Phaser phaser = new MyPhaser();
        PhaserWorker[] phaserWorkers = new PhaserWorker[5];
        for (int i = 0; i < phaserWorkers.length; i++) {
            phaserWorkers[i] = new PhaserWorker(phaser);
            // 注册 Phaser 等待的线程数,执行一次等待线程数 +1
            phaser.register();
        }
        for (int i = 0; i < phaserWorkers.length; i++) {
            // 执行任务
            new Thread(new PhaserWorker(phaser)).start();
        }
    }
    static class PhaserWorker implements Runnable {
        private final Phaser phaser;
        public PhaserWorker(Phaser phaser) {
            this.phaser = phaser;
        }
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " | 到达" );
            phaser.arriveAndAwaitAdvance(); // 集合完毕发车
            try {
                Thread.sleep(new Random().nextInt(5) * 1000);
                System.out.println(Thread.currentThread().getName() + " | 到达" );
                phaser.arriveAndAwaitAdvance(); // 景点 1 集合完毕发车
                Thread.sleep(new Random().nextInt(5) * 1000);
                System.out.println(Thread.currentThread().getName() + " | 到达" );
                phaser.arriveAndAwaitAdvance(); // 景点 2 集合完毕发车
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    // Phaser 每个阶段完成之后的事件通知
    static class MyPhaser extends  Phaser{
        @Override
        protected boolean onAdvance(int phase, int registeredParties) { // 每个阶段执行完之后的回调
            switch (phase) {
                case 0:
                    System.out.println("==== 集合完毕发车 ====");
                    return false;
                case 1:
                    System.out.println("==== 景点1集合完毕,发车去下一个景点 ====");
                    return false;
                case 2:
                    System.out.println("==== 景点2集合完毕,发车回家 ====");
                    return false;
                default:
                    return true;
            }
        }
    }
}

以上程序执行结果如下:

Thread-0 | 到达

Thread-4 | 到达

Thread-3 | 到达

Thread-1 | 到达

Thread-2 | 到达

==== 集合完毕发车 ====

Thread-0 | 到达

Thread-4 | 到达

Thread-1 | 到达

Thread-3 | 到达

Thread-2 | 到达

==== 景点1集合完毕,发车去下一个景点 ====

Thread-4 | 到达

Thread-3 | 到达

Thread-2 | 到达

Thread-1 | 到达

Thread-0 | 到达

==== 景点2集合完毕,发车回家 ====

执行流程如下图:

7901628265945_.pic.jpg

文章目录