• 多线程与高并发(8)—— 从CountDownLatch总结AQS共享锁(三周年打卡)


    不知不觉已经写文章3年了,3年只写了六十多篇文章,略有惭愧。希望后续一段时间能迎来一波创作的爆发,也能迎来自己技术的爆发。加油吧,翠花!

    一 、概述

    上篇文章:多线程与高并发(7)——从ReentrantLock到AQS源码详细讲解了AQS的原理,我们知道使用 AQS 能简单且高效地构造出同步器(重写tryAcquire、tryRelease就能简单实现)。这里呢,JDK给我们提供了很多高效同步器,如CountDownLatch 、CyclicBarrier、Phaser、Semaphore、Exchanger、ReentrantReadWriteLock,我们后面文章逐一讲解分析。
    这里,我们会通过CountDownLatch 总结AQS共享锁的原理,同时对比下CyclicBarrier和Phaser。

    二、CountDownLatch

    CountDownLatch字面意思是倒数门栓,也就是倒数计数多少个线程执行完毕了。 其允许 int个线程阻塞在一个地方,直至所有线程的任务都执行完毕。

    1、常用方法

    await(): 调用该方法的线程处于等待状态,直到latch的值被减到0或者当前线程被中断。一般都是主线程调用。——开门
    await(long timeout, TimeUnit unit):带超时时间的await。
    countDown():使latch的值减1,如果减到了0,则会唤醒所有等待在这个latch上的线程。——倒数
    getCount():获得latch值。

    2、两种典型用法

    1、执行完所有的业务后才执行主线程
    将 CountDownLatch 的计数器初始化为 n (new CountDownLatch(n)),每当一个任务线程执行完毕,就将计数器减 1 (countdownlatch.countDown()),当计数器的值变为 0 时,在 CountDownLatch 上 await() 的线程就会被唤醒。比如百度的文本转语音的代码,对于长文本来说,需要分段来转,需每段都转换完成才能整合整个文本。
    我们再举一个例子,比如一场考试,有30个考生(线程),一个监考老师(主线程),每个考生完成试卷之后都可以提前交卷,且交卷后不需要管其他考试如何,考试结束后(所有学生交卷,latch为0),监考老师才能离开(主线程结束)。
    30个学生输出结果太长,我们换成5个学生,代码如下:

     public static void main(String[] args) {
            CountDownLatch countDownLatch = new CountDownLatch(5);
            //指定线程数
            ExecutorService threadPool = Executors.newFixedThreadPool(30);
            for (int i = 0; i < 5; i++) {
                int finalI = i;
                threadPool.execute(() -> {
                    System.out.println("学生"+ finalI +"交卷");
                    countDownLatch.countDown();
                });
            }
            try {
                //开门
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            threadPool.shutdown();
            System.out.println("考试结束,监考老师离场");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    运行结果如下,可以看出学生交卷是无序的,但是老师必须等所有学生交完卷才能离场:

    学生0交卷
    学生2交卷
    学生3交卷
    学生1交卷
    学生4交卷
    考试结束,监考老师离场
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2、某一时刻,所有线程一起执行。
    此作用是实现多个线程开始执行任务的最大并行性。
    所谓并行,强调的是多个线程在某一时刻同时开始执行。而并发在围观上仍然是顺序执行。
    类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。
    代码如下:

     public static void main(String[] args) {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            CountDownLatch countDownLatch2 = new CountDownLatch(5);
            //指定线程数
            ExecutorService threadPool = Executors.newFixedThreadPool(30);
            for (int i = 0; i < 5; i++) {
                int finalI = i;
                threadPool.execute(() -> {
                    try {
                        System.out.println("运动员"+ finalI +"正在准备");
                        //线程阻塞,等着被释放
                        countDownLatch.await();
                        Thread.sleep(100);
                        System.out.println("运动员"+ finalI +"冲刺");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    countDownLatch2.countDown();
                    System.out.println("运动员"+ finalI +"完成比赛");
    
                });
            }
            try {
                Thread.sleep(1000);
                System.out.println("所有运动员准备完毕,开跑");
                //解锁释放上面的所有线程
                countDownLatch.countDown();
                //所有线程完成比赛,往下走
                countDownLatch2.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            threadPool.shutdown();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    执行结果如下:

    运动员1正在准备
    运动员4正在准备
    运动员3正在准备
    运动员0正在准备
    运动员2正在准备
    所有运动员准备完毕,开跑
    运动员0冲刺
    运动员1冲刺
    运动员4冲刺
    运动员3冲刺
    运动员2冲刺
    运动员3完成比赛
    运动员4完成比赛
    运动员1完成比赛
    运动员0完成比赛
    运动员2完成比赛
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    3、源码解析

    有了上一篇文章的基础,我们这里讲共享锁的流程会更加的得心应手。

    CountDownLatch 是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。当线程使用 countDown()方法时,其实使用了tryReleaseShared方法以 CAS 的操作来减少 state,直至 state 为 0 。当调用 await() 方法的时候,如果 state 不为 0,那就证明任务还没有执行完毕,await() 方法就会一直阻塞,也就是说await() 方法之后的语句不会被执行。然后,CountDownLatch 会自旋 CAS 判断 state == 0,如果 state == 0 的话,就会释放所有等待的线程,await() 方法之后的语句得到执行。

    首先,我们看一下它的构造方法:

    public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
        ...
     Sync(int count) {
                setState(count);
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    可以看出来,在进行new对象的时候,count就是state的初始值。
    然后,我们再看一下await()方法:

     public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
    • 1
    • 2
    • 3

    其调用了sync的acquireSharedInterruptibly方法,也是AQS的方法:

     public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
                // 如果被中断,抛出异常
            if (Thread.interrupted())
                throw new InterruptedException();
                // 尝试获取同步状态
            if (tryAcquireShared(arg) < 0)
            	// 获取同步状态失败,自旋
                doAcquireSharedInterruptibly(arg);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    tryAcquireShared()方法为CountDownLatch 重写的方法,如下:

    protected int tryAcquireShared(int acquires) {
    			//当前状态是否为0可释放锁
                return (getState() == 0) ? 1 : -1;
            }
    
    • 1
    • 2
    • 3
    • 4

    doAcquireSharedInterruptibly代码如下:

     private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            //将当前线程加入同步队列的尾部,addWaiter可以看上一篇文章
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
            	//自旋
                for (;;) {
                //当前节点的前驱节点
                    final Node p = node.predecessor();
                    //如果前驱节点是头结点,则尝试获取同步状态
                    if (p == head) {
                    	//当前节点尝试获取同步状态
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                        	//如果获取成功,则设置当前节点为头结点并唤醒下一个线程
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    //如果当前节点的前驱不是头结点,尝试挂起当前线程,和独享锁相同
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                	//取消获取锁,和独享锁相同
                    cancelAcquire(node);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    下面,我们看一下countDown() 方法的源码:

    public void countDown() {
            sync.releaseShared(1);
        }
    
    • 1
    • 2
    • 3

    其同样调用了AQS的releaseShared()方法:

    public final boolean releaseShared(int arg) {
    		//获取释放同步状态
            if (tryReleaseShared(arg)) {
            	// 如果成功,进入自旋,尝试唤醒同步队列中头结点的后继节点
                doReleaseShared();
                return true;
            }
            return false;
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    tryReleaseShared()代码如下:

     protected boolean tryReleaseShared(int releases) {
                // 就一个意思,自旋,直到state状态-1为0
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    doReleaseShared()方法如下:

     private void doReleaseShared() {
     		//上来先自旋
            for (;;) {
            	//获取头节点
                Node h = head;
                if (h != null && h != tail) {
                	//头结点状态
                    int ws = h.waitStatus;
                    //如果是SIGNAL,尝试唤醒后继节点
                    if (ws == Node.SIGNAL) {
                    	//只要head成功的从SIGNAL修改为0,那么head的后继节点对应的线程将会被唤醒。
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        //把下一个不为空的节点unpark
                        unparkSuccessor(h);
                    }
                    //其他时候不唤醒
                    else if (ws == 0 &&
                    //前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。不停地for
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                // 如果head没有改变,则调用break退出循环
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    当头结点的后继节点被唤醒后,线程将从挂起的地方醒来,继续执行。当前状态若还>0,则设置当前节点为头结点。setHeadAndPropagate()代码如下:

     private void setHeadAndPropagate(Node node, int propagate) {
     		//当前头节点
            Node h = head; // Record old head for check below
            //设置当前节点为头节点
            setHead(node);
            //如果执行这个函数,那么propagate一定等于1
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                //获取当前节点的下一个节点
                Node s = node.next;
                //唤醒后续节点
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    嗯,写着写着有点乱了,画个图冷静一下:
    在这里插入图片描述
    蓝色为CountDownLatch,黄色为AQS源码。

    三、CyclicBarrier

    CyclicBarrier 的字面意思是可循环使用(Cyclic)的栅栏(Barrier)。一组线程到达一个栅栏(也可以叫同步点)时被阻塞,直到最后一个线程到达栅栏时,栅栏才会开门,所有被拦截的线程才会继续干活。
    它和CountDownLatch的区别是,比如考试,CountDownLatch中前一个考生完全不用管后一个考生如何。但是CyclicBarrier中,必须所有人到达,比如小学生出去旅游,回家时,必须每个都上车能回家。
    核心方法:
    await() :在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
    CyclicBarrier(int parties, Runnable barrierAction): 用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景。
    reset():将屏障重置为其初始状态。如果所有参与者目前都在屏障处等待,则它们将返回,同时抛出一个 BrokenBarrierException。
    使用代码如下:

     public static void main(String[] args) {
            //每5个人发车
            CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
            //指定线程数
            ExecutorService threadPool = Executors.newFixedThreadPool(30);
            for (int i = 0; i < 5; i++) {
                int finalI = i;
                threadPool.execute(() -> {
                    System.out.println("学生"+ finalI +"上车");
                    try {
                        //等待以保证子线程执行结束
                        cyclicBarrier.await(60, TimeUnit.SECONDS);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                    System.out.println("学生"+ finalI +"出发");
                });
            }
            threadPool.shutdown();
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    执行结果如下:

    学生1上车
    学生3上车
    学生4上车
    学生2上车
    学生0上车
    学生0出发
    学生3出发
    学生1出发
    学生2出发
    学生4出发
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    可以看出,所有学生到齐了才出发。
    CountDownLatch 是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而 CyclicBarrier 更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。
    扩展:
    Phaser类又是什么呢?
    Phaser 是是一个可重用的同步栅栏,它的功能与 CountDownLatch、CyclicBarrier 相似,但是可以用来解决控制多个线程分阶段共同完成任务的情景问题。——分段栅栏
    也有点类似于CyclicBarrier的barrierAction,这里就不过多总结了。

  • 相关阅读:
    -Xms512m -Xmx1024m 等参数的含义解释
    Boost电压增益分析(输入电流低频脉动时)
    缓存同步canal实现(订阅binlog)
    mycat2
    【Linux】信号 —— 信号的产生 | 信号的保存 | 信号的处理 | volalite关键字 | SIGCHLD
    linux系统、kylin麒麟系统 添加samba 安装和配置
    从零开始的搭建指南:开发高效的抖音预约服务小程序
    后端返回二进制文件,js 下载为xls或xlsx文件
    使用sed命令进行文本处理示例
    windows系统下,C++统计进程内存使用情况
  • 原文地址:https://blog.csdn.net/liwangcuihua/article/details/125619068