• Java多线程篇(7)——AQS之共享锁(Semaphore、CountDownLatch)


    1、Semaphore

    在这里插入图片描述

    1.1、acquire

    Semaphore.acquire

        public void acquire() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
    • 1
    • 2
    • 3

    AbstractQueuedSynchronizer.acquireSharedInterruptibly

        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            //尝试获取共享资源(默认非公平的实现)    
            if (tryAcquireShared(arg) < 0)
                //若获取资源失败进入这个方法
                doAcquireSharedInterruptibly(arg);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    Semaphore.tryAcquireShared

       //Semaphore非公平锁的实现
       protected int tryAcquireShared(int acquires) {
           return nonfairTryAcquireShared(acquires);
       }
       final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
               int available = getState();
               int remaining = available - acquires;
               if (remaining < 0 ||
                 compareAndSetState(available, remaining))
                 return remaining;
              }
        }
    
    	//Semaphore公平锁的实现
        protected int tryAcquireShared(int acquires) {
           for (;;) {
                if (hasQueuedPredecessors())
                   return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                   compareAndSetState(available, remaining))
                   return remaining;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    AbstractQueuedSynchronizer.doAcquireSharedInterruptibly

        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            //封装共享节点并入队
            final Node node = addWaiter(Node.SHARED);
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    //如果前一个节点是头结点,再次尝试获锁
                    if (p == head) {
                        //获取共享资源
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            //如果获取成功设置头节点并传播
                            //其实就是成功获取资源后接着唤醒后面的节点,因为是共享资源,所以要尽可能唤醒更多的节点
                            setHeadAndPropagate(node, r);
                            p.next = null;
                            return;
                        }
                    }
                    //如果获锁失败或者前节点不是head的节点就根据前节点的状态来看是否需要阻塞
                    //需要阻塞就调用 LockSupport.park() 阻塞线程
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } catch (Throwable t) {
                cancelAcquire(node);
                throw t;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    被唤醒后…
    AbstractQueuedSynchronizer.setHeadAndPropagate

        private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head;
            //设置头结点
            setHead(node);
    
    		//唤醒后继节点
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                //如果是共享节点调用doReleaseShared唤醒后继节点
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    AbstractQueuedSynchronizer.doReleaseShared

     private void doReleaseShared() {
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    //cas(SIGNAL->0) + unpark 
                    if (ws == Node.SIGNAL) {
                        if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                            continue;
                        unparkSuccessor(h);
                    }
                    //cas(0->PROPAGATE)
                    else if (ws == 0 &&
                             !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                        continue;
                }
                //如果head不变,说明已经没有需要唤醒的节点了,则break
                if (h == head)
                    break;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    1.2、release

    Semaphore.release

        public void release() {
            sync.releaseShared(1);
        }
    
    • 1
    • 2
    • 3

    AbstractQueuedSynchronizer.releaseShared

        public final boolean releaseShared(int arg) {
            //释放共享资源
            if (tryReleaseShared(arg)) {
                //成功释放则调用 doReleaseShared 唤醒节点
                doReleaseShared();
                return true;
            }
            return false;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    Semaphore.tryReleaseShared

    	protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    可以发现独占锁只会唤醒一个节点,而共享锁会类似传播一样从第一个被唤醒的节点开始逐次唤醒后面的节点。

    2、CountDownLatch

    在这里插入图片描述

    2.1、await

    CountDownLatch.await

        public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
    • 1
    • 2
    • 3

    acquireSharedInterruptibly上面已经分析过了,这里直接看到CountDownLatch的获取共享资源的实现
    CountDownLatch.tryAcquireShared

       protected int tryAcquireShared(int acquires) {
            //如果state为0,则允许获取,反之不允许
            return (getState() == 0) ? 1 : -1;
       }
    
    • 1
    • 2
    • 3
    • 4

    2.2、countDown

    CountDownLatch.countDown

        public void countDown() {
            sync.releaseShared(1);
        }
    
    • 1
    • 2
    • 3

    releaseShared上面也已经分析过了,这里同样直接看到CountDownLatch的释放共享资源的实现
    CountDownLatch.tryReleaseShared

    	protected boolean tryReleaseShared(int releases) {
           for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                // state - 1
                int nextc = c - 1;
                //cas替换state
                if (compareAndSetState(c, nextc))
                    //如果替换后state为0,则返回true,表示可以唤醒节点了
                    return nextc == 0;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
  • 相关阅读:
    图像分类,看我就够啦!
    超全Chat GPT论文修改指令
    Stable Diffusion 系统教程 | 强大的ControlNet 控制网
    【开源】基于Vue和SpringBoot的高校宿舍调配管理系统
    Taro小程序富文本解析4种方法
    归并排序-面试例子
    Element-plus提交pr有感
    地线干扰的共阻干扰
    前端笔记-关于元素定位的深度理解
    成熟企业级开源监控解决方案Zabbix6.2关键功能实战-下
  • 原文地址:https://blog.csdn.net/qq_43196360/article/details/133580709