• AQS之ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue阻塞队列特性及源码分析


    系列文章目录

    第一节 synchronized关键字详解-偏向锁、轻量级锁、偏向锁、重量级锁、自旋、锁粗化、锁消除
    第二节 AQS抽象队列同步器原理详解
    第三节 AQS之ReentrantLock特性和源码分析及CAS和LockSupport的核心原理



    前言

    BlockingQueue——是JUC包提供用于解决并发生产者与消费者问题的类,具有在任意时刻只有一个线程可以进行take或者put方法的特性,即执行take和put方法时阻塞,还提供了超时机制,常用于解耦,在很多的生产与消费场景中可以看见,例如需要在一个系统内实现多任务并发执行,可将任务放入阻塞队列存放,多线程进行消费执行。


    一、队列的类型与数据结构

    • 队列可分为无限队列 (几乎可以无限增长)和有限队列 (定义了最大容量)两种类型
    • 是一种存储数据的结构,可用链表或者数组实现
    • 具备FIFO先进先出的特性,也有双端队列(Deque)优先级队列
    • 主要操作包含入队(EnQueue)与出队(Dequeue)

    二、阻塞队列的常见类型

    1、ArrayBlockingQueue-由数组支持的有界队列

    • ArrayBlockingQueue是最典型的有界阻塞队列,其内部是用数组存储元素的,初始化时需要指定容量大小
    • 在生产者-消费者模型中使用时,如果生产速度和消费速度基本匹配的情况下,使用ArrayBlockingQueue是个不错选择;当如果生产速度远远大于消费速度,则会导致队列填满,大量生产线程被阻塞,反之则是大量消费线程被阻塞。
    • 使用独占锁ReentrantLock实现线程安全,入队和出队操作使用同一个锁对象,也就是只能有一个线程可以进行入队或者出队操作;这也就意味着生产者和消费者无法并行操作,在高并发场景下会成为性能瓶颈。

    A、代码案例

    下面是以一个工厂生产包子,然后进行包装消费的案例
    包子类

    package com.xj.queue;
    
    //包子类
    public class Bun {
        String id;//包子编号
        int weight;//包子重量
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public int getWeight() {
            return weight;
        }
    
        public void setWeight(int weight) {
            this.weight = weight;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    包子制作工人线程

    package com.xj.queue;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import java.util.List;
    import java.util.Random;
    import java.util.UUID;
    import java.util.concurrent.BlockingQueue;
    
    //包子制作者
    public class BunProducer implements Runnable{
        static Logger log = LoggerFactory.getLogger(BunProducer.class);
    
        private BlockingQueue<Bun> queue;
        List<Bun> bucket;
    
        public BunProducer(BlockingQueue<Bun> queue,List<Bun> bucket) {
            this.queue = queue;
            this.bucket = bucket;
        }
    
        @Override
        public void run() {
            while (true){
                try{
                    if(BunStarter.producer_counter.get() >= BunStarter.bunNumber){
                        log.info("制作工人:{},本次需要生产的包子数量达到{},将停止生产",Thread.currentThread().getName(),BunStarter.bunNumber);
                        break;
                    }
                    Bun bun = new Bun();
                    bun.setId(UUID.randomUUID().toString());
                    bun.setWeight(new Random().nextInt(10) + 95);//重量在:95-104
                    if(bun.getWeight() >= BunStarter.qualifiedWeight){
                        this.queue.put(bun);
                        int count = BunStarter.producer_counter.incrementAndGet();
                        log.info("制作工人:{},包子ID:{},重量:{}合格,投入生产线队列进行包装,目前生产总额为{}",Thread.currentThread().getName(),bun.getId(),bun.getWeight(),count);
                    }else {
                        log.info("制作工人:{},包子ID:{},重量:{}不合格,将放入不合格桶",Thread.currentThread().getName(),bun.getId(),bun.getWeight());
                        bucket.add(bun);
                    }
                }catch (InterruptedException e){
                    log.info("制作工人:{},被中断,将停止包子制作",Thread.currentThread().getName());
                    break;
                }
            }
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    包子消费工人线程

    package com.xj.queue;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    //包子包装者
    public class BunConsumer implements Runnable {
        static Logger log = LoggerFactory.getLogger(BunConsumer.class);
    
        private BlockingQueue<Bun> queue;
    
        public BunConsumer(BlockingQueue<Bun> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            while(true){
                try{
                    if(BunStarter.consumer_counter.get() >= BunStarter.bunNumber){
                        log.info("包装工人:{},本次需要包装的包子数量达到{},将停止包装",Thread.currentThread().getName(),BunStarter.bunNumber);
                        break;
                    }
                    //Bun bun = queue.take();//会一直阻塞
                    Bun bun = queue.poll(10, TimeUnit.SECONDS);//阻塞10s
                    if(bun != null){
                        log.info("包装工人:{},包子ID:{},开始进行包装,目前包装总额为{}",Thread.currentThread().getName(),bun.getId(),BunStarter.consumer_counter.incrementAndGet());
                    }
                }catch (InterruptedException e) {
                    log.info("包装工人:{},被中断,将停止包子包装",Thread.currentThread().getName());
                    break;
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    包子生产条线启动器

    package com.xj.queue;
    
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.atomic.AtomicInteger;
    
    //包子生产条线启动器
    public class BunStarter {
        //包子制作工人数目
        public static final int bunProducerNum = 10;
        //包子包装工人数目
        public static final int bunConsumerNum = 10;
    
        //需要生产进行包装的包子总个数
        public static final int bunNumber = 5000;
    
        //定义包子重量不足100g为不合格产品
        public static final int qualifiedWeight = 100;
    
        //生产包子计数器,原子操作
        public static AtomicInteger producer_counter = new AtomicInteger();
    
        //消费包子计数器,原子操作
        public static AtomicInteger consumer_counter = new AtomicInteger();
    
        public static void main(String[] args) {
            //储存包子的队列大小容量为100(类似流水线的长度)
            BlockingQueue<Bun> bunQueue = new ArrayBlockingQueue<Bun>(100);
    
            //存放所有不合格包子产品,保证线程安全
            List<Bun> bucket = Collections.synchronizedList(new ArrayList<Bun>());
    
            //多个包子制作工人
            for (int i = 1; i <= bunProducerNum; i++) {
                new Thread(new BunProducer(bunQueue,bucket)).start();
            }
    
            //多个包子包装工人
            for (int i = 1; i <= bunConsumerNum; i++) {
                new Thread(new BunConsumer(bunQueue)).start();
            }
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    结果打印:
    在这里插入图片描述

    B、源码分析

    利用Lock锁的Condition通知机制进行阻塞控制,一把ReentrantLock锁、两个条件(notEmpty、notFull)。

    (1)、初始化队列

    构造阻塞队列

    new ArrayBlockingQueue<Bun>(100);
    
    • 1

    使用的是ReentrantLock非公平锁以及条件Condition

    public ArrayBlockingQueue(int capacity) {
            this(capacity, false);
    }
    
    public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            //队列数组
            this.items = new Object[capacity];
            //非公平锁
            lock = new ReentrantLock(fair); //公平,非公平
            //两个条件队列notEmpty和notFull
            notEmpty = lock.newCondition();//队列不空
            notFull =  lock.newCondition();//队列不满
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    (2)、入队PUT方法
    	/**
         * Inserts the specified element at the tail of this queue, waiting
         * for space to become available if the queue is full.
         *
         * @throws InterruptedException {@inheritDoc}
         * @throws NullPointerException {@inheritDoc}
         */
        public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            //加锁,如果线程中断抛出异常 
            lock.lockInterruptibly();
            try {
            	//循环判断队列是否满了
            	//为什么要使用while,而不是if判断呢?是为了防止虚假唤醒
            	/*if只会判断一次,如果条件满足,也就阻塞一次;使用while可以在唤醒之后继续判断条件
            	*/
                while (count == items.length)
                	//在notFull条件上等待,并释放锁
                    notFull.await();
                // 入队
                enqueue(e);
            } finally {
            	//解锁
                lock.unlock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    enqueue(e)方法操作入队逻辑

    	/**
         * Inserts element at current put position, advances, and signals.
         * Call only when holding lock.
         */
        private void enqueue(E x) {
            // assert lock.getHoldCount() == 1;
            // assert items[putIndex] == null;
            final Object[] items = this.items;
            //放入数组
            items[putIndex] = x;
    		//环形数组:putIndex指针到数组尽头了,返回头部
            if (++putIndex == items.length)
                putIndex = 0;
            count++;
            //唤醒notEmpty条件队列
            notEmpty.signal();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    signal方法将条件等待队列中

    public final void signal() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                //判断头节点是否为空
                if (first != null)
                    doSignal(first);
    }
    
    private void doSignal(Node first) {
                do {
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) && (first = firstWaiter) != null);//从条件队列转移到同步队列成功,退出循环,反之则继续调用transferForSignal方法
    }
    
    final boolean transferForSignal(Node node) {
            /*
             * If cannot change waitStatus, the node has been cancelled.
             */
            //将node节点的状态修改为0
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    
            /*
             * Splice onto queue and try to set waitStatus of predecessor to
             * indicate that thread is (probably) waiting. If cancelled or
             * attempt to set waitStatus fails, wake up to resync (in which
             * case the waitStatus can be transiently and harmlessly wrong).
             */
            //入同步等待队列
            Node p = enq(node);
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    await方法在接口Condition定义,在AQS里实现

    public final void await() throws InterruptedException {
    			//如果线程被中断,抛出InterruptedException异常
                if (Thread.interrupted())
                    throw new InterruptedException();
                //将线程加入条件等待队列
                Node node = addConditionWaiter();
                //释放当前锁
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                //循环判断node节点是否在同步等待队列
                while (!isOnSyncQueue(node)) {
                	//阻塞
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                //节点线程再次去申请锁
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    addConditionWaiter方法添加到条件等待队列

    /**
             * Adds a new waiter to wait queue.
             * @return its new wait node
             */
            private Node addConditionWaiter() {
            	//t指向尾结点
                Node t = lastWaiter;
                // If lastWaiter is cancelled, clean out.
                //如果尾结点不为空并且waitStatus信号量不是condition
                if (t != null && t.waitStatus != Node.CONDITION) {
                	//清除掉不符合条件的节点
                    unlinkCancelledWaiters();
                    //t继续指向尾结点
                    t = lastWaiter;
                }
                //构造条件节点
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                //尾结点t为空,头节点firstWaiter指针指向当前节点
                if (t == null)
                	// 初始化队列
                    firstWaiter = node;
                else
                	//尾结点t的下一个节点nextWaiter指针指向当前节点
                    t.nextWaiter = node;
                //尾结点指针lastWaiter指向当前节点
                lastWaiter = node;
                return node;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    unlinkCancelledWaiters方法:遍历等待队列,将非CONDITION状态到的节点移除

    private void unlinkCancelledWaiters() {
    	//t指针指向头节点
        Node t = firstWaiter; 
        //记录遍历进度节点的指针
        Node trail = null;
        //如果头节点不为空
        while (t != null) { 
        	//遍历等待队列,保存当前t的下一个节点对象
            Node next = t.nextWaiter; 
            //如果当前t节点的状态不是为CONDITION
            if (t.waitStatus != Node.CONDITION) { 
            	//将t节点移除队列,t节点的nextWaiter指针指向null
                t.nextWaiter = null;
                //第一次遍历,进度节点为null
                if (trail == null)
                	//头节点指向被移除节点的下一个节点
                    firstWaiter = next; 
                else
                	//进度节点不为null,则指向下一个节点
                    trail.nextWaiter = next; 
                //如果next为空,表明队列遍历完成,将尾指针指向进度节点
                if (next == null)
                    lastWaiter = trail;
            }
            else //如果当前t节点的状态是CONDITION
            	//保存进度节点
                trail = t;
            //指针t指向下一个节点
            t = next;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    (3)、出队TAKE方法
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        //加锁,如果线程中断抛出异常 
        lock.lockInterruptibly();
        try {
           //如果队列为空
            while (count == 0)
                notEmpty.await();//在notEmpty条件上等待,并释放锁
            //出队
            return dequeue();
        } finally {
            lock.unlock();// 解锁,唤醒其它线程
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    dequeue出队方法

    private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex]; //取出takeIndex位置的元素
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0; // 环形数组,takeIndex 指针到数组尽头了,返回头部
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();//notFull条件队列转同步队列
        return x;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    2、LinkedBlockingQueue-由链表支持的可选有界队列

    • LinkedBlockingQueue是一个基于链表实现的阻塞队列,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE,由于这个数值特别大,所以LinkedBlockingQueue也被称作无界队列,代表它几乎没有界限,队列可以随着元素的添加而动态增长,但是如果没有剩余内存,则队列将抛出OOM错误。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。
    • LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素
    • LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行

    源码分析

    (1)、初始化队列
        /**
         * Creates a {@code LinkedBlockingQueue} with a capacity of
         * {@link Integer#MAX_VALUE}.
         */
        //无界队列
        public LinkedBlockingQueue() {
            this(Integer.MAX_VALUE);
        }
    
        /**
         * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
         *
         * @param capacity the capacity of this queue
         * @throws IllegalArgumentException if {@code capacity} is not greater
         *         than zero
         */
        //有界队列
        public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
            last = head = new Node<E>(null);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    (2)、常用属性
    //容量,指定容量就是有界队列
    private final int capacity;
    
    //元素数量
    private final AtomicInteger count = new AtomicInteger();
    
    //头节点
    transient Node<E> head;
    
    //尾节点
    private transient Node<E> last;
    
    //take锁,锁分离,提高效率
    private final ReentrantLock takeLock = new ReentrantLock();
    
    //notEmpty条件
    //当队列无元素时,take锁会阻塞在notEmpty条件上,等待其它线程唤醒
    private final Condition notEmpty = takeLock.newCondition();
    
    //put锁,锁分离,提高效率
    private final ReentrantLock putLock = new ReentrantLock();
    
    //notFull条件
    //当队列满了时,put锁会会阻塞在notFull上,等待其它线程唤醒
    private final Condition notFull = putLock.newCondition();
    
    //单链表Node元素
    static class Node<E> {
        E item;  //存储元素
        Node<E> next;  //后继节点    单链表结构
        Node(E x) { item = x; }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    (3)、入队PUT方法
    public void put(E e) throws InterruptedException {
    		//元素为null,抛出空指针异常
            if (e == null) throw new NullPointerException();
            // Note: convention in all put/take/etc is to preset local var
            // holding count negative to indicate failure unless set.
            int c = -1;
            //构建Node节点
            Node<E> node = new Node<E>(e);
            //put操作锁
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            //加put锁
            putLock.lockInterruptibly();
            try {
                /*
                 * Note that count is used in wait guard even though it is
                 * not protected by lock. This works because count can
                 * only decrease at this point (all other puts are shut
                 * out by lock), and we (or some other waiting put) are
                 * signalled if it ever changes from capacity. Similarly
                 * for all other uses of count in other wait guards.
                 */
                //判断元数个数是否满了
                while (count.get() == capacity) {
                    notFull.await();//释放锁,当前线程加入条件等待队列
                }
                //链表入队
                enqueue(node);
                //获取元数个数后,原子加1
                c = count.getAndIncrement();
                //如果少于容量
                if (c + 1 < capacity)
                	//可唤醒生产者线程(唤醒阻塞在notFull条件上的线程),从条件等待队列转移到同步等待队列,继续生产入队,等待unlock释放锁
                    notFull.signal();
            } finally {
            	//put锁释放,真正唤醒生产者线程
                putLock.unlock();
            }
            //如果原队列长度为0,现在加了一个元素后立即唤醒阻塞在notEmpty上的线程
            if (c == 0)
                signalNotEmpty();
    }
    
    //存节点从last进
    private void enqueue(Node<E> node) { 
        //尾节点的下一个节点指向node节点,尾节点last指向入队元素
        last = last.next = node;
    }    
    
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock; 
        takeLock.lock();// 加take锁
        try {
        	// notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程
            notEmpty.signal();
        } finally {
            takeLock.unlock();  // 真正唤醒消费者线程
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    (4)、出队TAKE方法
    public E take() throws InterruptedException {
            E x;
            int c = -1;
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            //加take锁
            takeLock.lockInterruptibly();
            try {
            	//队列元素个数为0
                while (count.get() == 0) {
                	//在notEmpty条件上阻塞并释放锁,加入到条件等待队列
                    notEmpty.await();
                }
                //出队
                x = dequeue();
                //获取元数个数后,原子减1
                c = count.getAndDecrement();
                //如果取之前队列元素个数大于1
                if (c > 1)
                	//notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程,原因与入队同理
                    notEmpty.signal();
            } finally {
            	//真正唤醒消费者线程
                takeLock.unlock();
            }
            // 如果取之前队列元素个数等于容量(表示已满),则唤醒阻塞在notFull的线程
            if (c == capacity)
                signalNotFull();
            return x;
    }
    
    //取节点从head出
    private E dequeue() {
        //head节点不存储item元素值
        //删除head,并把head下一个节点作为新的head节点,并把其值置空,返回原来的值
        Node<E> h = head;//头节点
        Node<E> first = h.next;//头节点的后继节点为出队节点
        h.next = h; // 方便GC,h.next指向空
        head = first;//头节点指向first节点
        E x = first.item;//返回first节点item值
        first.item = null;//清空first节点的item值
        return x;
    }
    
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();// notFull条件队列转同步队列,准备唤醒阻塞在notFull上的线程
        } finally {
            putLock.unlock(); // 解锁,这才会真正的唤醒生产者线程
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    3、SynchronousQueue-无缓冲的等待队列

    • SynchronousQueue是一个无数据缓冲的阻塞队列,生产者线程对其的put插入操作必须等待消费者的take移除操作,这也就导致每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取
    • 适合传递性场景做交换工作,生产者的线程和消费者的线程同步传递某些信息、事件或者任务

    A、代码案例

    package com.xj.queue;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.SynchronousQueue;
    
    public class SynchronousQueueExample {
        public static void main(String[] args) throws InterruptedException {
        	//默认false为非公平模式,true为公平模式
            BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<Integer>();
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try{
                        Integer value = synchronousQueue.take();
                        System.out.println(Thread.currentThread().getName()+":"+value);
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }
                }
            },"consumer1").start();
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try{
                        Integer value = synchronousQueue.take();
                        System.out.println(Thread.currentThread().getName()+":"+value);
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }
                }
            },"consumer2").start();
    
            Thread.sleep(1000);
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try{
                        synchronousQueue.put(1);
                        System.out.println(Thread.currentThread().getName()+":"+1);
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }
                }
            },"product1").start();
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try{
                        synchronousQueue.put(2);
                        System.out.println(Thread.currentThread().getName()+":"+2);
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }
                }
            },"product2").start();
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    结果打印,默认采用的非公平模式

    product1:1
    consumer2:1
    product2:2
    consumer1:2
    
    Process finished with exit code 0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    B、源码分析

    (1)、初始化队列

    非公平模式基于链表队列实现

    BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<Integer>();
    
    • 1

    公平模式基于栈结构实现

    BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<Integer>(true);
    
    • 1

    默认构造方法采用非公平模式,TransferQueue和TransferStack都继承Transferer抽象类并重写了transfer方法

    	/**
         * Creates a {@code SynchronousQueue} with nonfair access policy.
         */
        public SynchronousQueue() {
            this(false);
        }
    
        /**
         * Creates a {@code SynchronousQueue} with the specified fairness policy.
         *
         * @param fair if true, waiting threads contend in FIFO order for
         *        access; otherwise the order is unspecified.
         */
        public SynchronousQueue(boolean fair) {
            transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    Transferer抽象类

    /**
         * Shared internal API for dual stacks and queues.
         */
        abstract static class Transferer<E> {
            /**
             * Performs a put or take.
             *
             * @param e if non-null, the item to be handed to a consumer;
             *          if null, requests that transfer return an item
             *          offered by producer.
             * @param timed if this operation should timeout
             * @param nanos the timeout, in nanoseconds
             * @return if non-null, the item provided or received; if null,
             *         the operation failed due to timeout or interrupt --
             *         the caller can distinguish which of these occurred
             *         by checking Thread.interrupted.
             */
            abstract E transfer(E e, boolean timed, long nanos);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    (2)、入队PUT方法
    public void put(E e) throws InterruptedException {
    		//put元素为空,抛出空指针异常
            if (e == null) throw new NullPointerException();
            if (transferer.transfer(e, false, 0) == null) {
            	//如果判断成立,说明是中断唤醒的,则设置中断标记并抛出中断异常
                Thread.interrupted();
                throw new InterruptedException();
            }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    (2)、出队TAKE方法
     public E take() throws InterruptedException {
     		//
            E e = transferer.transfer(null, false, 0);
            //take元素不为空返回
            if (e != null)
                return e;
            //如果take元素为null,说明是中断唤醒的,则设置中断标记并抛出中断异常
            Thread.interrupted();
            throw new InterruptedException();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    (3)、transfer方法

    不论出队还是入队操作都调用了transfer方法

    //抽象方法,第一个参数为put元素值,为null说明是take操作
    abstract E transfer(E e, boolean timed, long nanos);
    
    • 1
    • 2
    公平模式-TransferQueue

    采用链表数据结构,QNode节点结构如下:

    static final class QNode {
    			//下一个节点
                volatile QNode next;          // next node in queue
                //节点元素
                volatile Object item;         // CAS'ed to or from null
                //节点所属的线程
                volatile Thread waiter;       // to control park/unpark
                //put和take的标记,put为true,take为false
                final boolean isData;
    
                QNode(Object item, boolean isData) {
                    this.item = item;
                    this.isData = isData;
                }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    TransferQueue构造方法

    TransferQueue() {
        //构造空节点,并设置isData为false,表示为take的节点
        QNode h = new QNode(null, false); // initialize to dummy node.
        //头节点和尾节点都指向了当前空节点
        head = h;
        tail = h;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    重写transfer方法

    E transfer(E e, boolean timed, long nanos) {
                QNode s = null; // constructed/reused as needed
                //判断当前e元素是否为空,e为空说明是take返回false,e不为空为put返回true
                boolean isData = (e != null);
    			
    			//自旋,保证CAS执行成功
                for (;;) {
                	//指向尾节点
                    QNode t = tail;
                    //指向头节点
                    QNode h = head;
                    
                    // 头尾为空,说明没有被初始化,继续下次循环
                    if (t == null || h == null)         // saw uninitialized value
                        continue;                       // spin
    				
    				//头节点和尾节点相等(队列中只有一个节点或者是只有一个空节点)或者模式相同(队尾节点的模式匹配)
                    if (h == t || t.isData == isData) { // empty or same-mode
                    	//获取尾节点的下一个节点
                        QNode tn = t.next;
                        //尾结点不一致,继续下次循环(由于其他节点入队了,导致读不一致)
                        if (t != tail)                  // inconsistent read
                            continue;
                        //尾节点的后继节点不为空
                        if (tn != null) {               // lagging tail
                        	//CAS修改尾节点
                            advanceTail(t, tn);
                            //下次继续循环
                            continue;
                        }
                        if (timed && nanos <= 0)        // can't wait
                            return null;
                        //当前的节点不存在,构建QNode新节点
                        if (s == null)
                            s = new QNode(e, isData);
                        //CAS修改节点s的next指针
                        if (!t.casNext(null, s))        // failed to link in
                        	//修改失败,则下次继续循环
                            continue;
    					//CAS将当前节点s修改为尾节点
                        advanceTail(t, s);              // swing tail and wait
                        //自旋阻塞
                        Object x = awaitFulfill(s, e, timed, nanos);
                        //节点被取消、中断或超时
                        if (x == s) {                   // wait was cancelled
                        	//清除当前s节点
                            clean(t, s);
                            //返回null
                            return null;
                        }
    					
    					//没有被取消、中断或超时
                        if (!s.isOffList()) {           // not already unlinked
                        	// 如果是头节点,从队列中移除
                            advanceHead(t, s);          // unlink if head
                            //当前元素不是空的
                            if (x != null)              // and forget fields
                            	//将当前节点的item执行当前节点
                                s.item = s;
                            //当前节点的持有线程变为null
                            s.waiter = null;
                        }
                        //返回处理后的元素,take操作传入null返回数据,put操作传入数据e返回null
                        return (x != null) ? (E)x : e;
    				//模式不同的逻辑
                    } else {                            // complementary-mode
                    	//获取头节点的后继节点
                        QNode m = h.next;               // node to fulfill
                        //尾节点不同||头节点的后继为空||头节点不同,则下次继续循环
                        if (t != tail || m == null || h != head)
                            continue;                   // inconsistent read
    					//获取头节点的后继节点元素
                        Object x = m.item;
                        if (isData == (x != null) ||    // m already fulfilled 节点位置有值
                            x == m ||                   // m cancelled 节点被取消、中断或超时
                            !m.casItem(x, e)) {         // lost CAS CAS修改item(无值修改为有值,有值修改为无值)
                            advanceHead(h, m);          // dequeue and retry
                            continue;
                        }
                        advanceHead(h, m);              // successfully fulfilled
                        //解除线程驻留,即唤醒线程继续工作
                        LockSupport.unpark(m.waiter);
                        //返回处理后的元素,take传入null返回数据e,put传入数据返回null
                        return (x != null) ? (E)x : e;
                    }
                }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87

    awaitFulfill方法,自旋或阻塞线程,直到满足s.item != e(传入的数据不是当前数据)

            /**
             * Spins/blocks until node s is fulfilled.
             *
             * @param s the waiting node 等待节点
             * @param e the comparison value for checking match 元素
             * @param timed true if timed wait 超时等待
             * @param nanos timeout value 超时值
             * @return matched item, or s if cancelled
             */
            Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
                /* Same idea as TransferStack.awaitFulfill */
                //计算超时时间,true则计算,false则就是0
                final long deadline = timed ? System.nanoTime() + nanos : 0L;
               	//当前线程对象
                Thread w = Thread.currentThread();
                //计算需要自旋的次数
                //如果头节点的后继节点就是s当前节点,根据cpu核心线程数计算相关的次数(例如32*16=512次数)
                int spins = ((head.next == s) ?
                             (timed ? maxTimedSpins : maxUntimedSpins) : 0);
                //自旋
                for (;;) {
                	//判断中断标志,不会清除线程的状态标记
                    if (w.isInterrupted())
                    	// 尝试取消当前节点
                        s.tryCancel(e);
                    // 获取当前节点的元素
                    Object x = s.item;
                    // 元素不相同,直接返回当前的元素(模式不匹配之后会将item由null变为e,由e变成null,unpark之后判断会成立返回x)
                    if (x != e)
                        return x;
                    // 如果设置了超时时间,判断超时时间
                    if (timed) {
                    	// 计算剩余时间
                        nanos = deadline - System.nanoTime();
                        // 剩余时间小于0
                        if (nanos <= 0L) {
                        	// 尝试取消当前节点
                            s.tryCancel(e);
                            // 继续下次循环
                            continue;
                        }
                    }
                    //自旋的次数大于0,执行次数自减操作
                    //自旋次数用完之后,设置waiter为当前线程
                    //其次判断没有设置超时,则直接进行阻塞park
                    if (spins > 0)
                    	//次数减1
                        --spins;
                    else if (s.waiter == null)
                    	//自旋次数用完之后,设置一下s的等待线程为当前线程
                        s.waiter = w;
                    else if (!timed)
                    	//没有设置超时,则直接进行阻塞
                        LockSupport.park(this);
                    else if (nanos > spinForTimeoutThreshold)
                    	// 超时时间大于1000L的时候,使用判断时间的阻塞(时间nanos大于0才会阻塞)
                        LockSupport.parkNanos(this, nanos);
                }
            }
    
    void tryCancel(Object cmp) {
        // CAS将传入的变量设置为this
        UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    非公平模式-TransferStack

    后进先出的栈结构,采用链表实现

    static final class SNode {
                volatile SNode next;        // next node in stack 后继节点
                volatile SNode match;       // the node matched to this  与之匹配成功的节点
                volatile Thread waiter;     // to control park/unpark 节点所属线程
                Object item;                // data; or null for REQUESTs 节点元素
                int mode;  //节点模式,DATA(put)和REQUEST(take)两种模式
                // Note: item and mode fields don't need to be volatile
                // since they are always written before, and read after,
                // other volatile/atomic operations.
    
                SNode(Object item) {
                    this.item = item;
                }
                
                static SNode snode(SNode s, Object e, SNode next, int mode) {
                	//s节点为空构建
    	            if (s == null) s = new SNode(e);
    	            //指定s节点的后继和模式
    	            s.mode = mode;
    	            s.next = next;
    	            return s;
            	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    重写transfer方法

            /**
             * Puts or takes an item.
             */
            @SuppressWarnings("unchecked")
            E transfer(E e, boolean timed, long nanos) {
                /*
                 * Basic algorithm is to loop trying one of three actions:
                 *
                 * 1. If apparently empty or already containing nodes of same
                 *    mode, try to push node on stack and wait for a match,
                 *    returning it, or null if cancelled.
                 *
                 * 2. If apparently containing node of complementary mode,
                 *    try to push a fulfilling node on to stack, match
                 *    with corresponding waiting node, pop both from
                 *    stack, and return matched item. The matching or
                 *    unlinking might not actually be necessary because of
                 *    other threads performing action 3:
                 *
                 * 3. If top of stack already holds another fulfilling node,
                 *    help it out by doing its match and/or pop
                 *    operations, and then continue. The code for helping
                 *    is essentially the same as for fulfilling, except
                 *    that it doesn't return the item.
                 */
    
                SNode s = null; // constructed/reused as needed
                //e为空REQUEST模式(take),e不为空DATA模式(put)
                int mode = (e == null) ? REQUEST : DATA;
    
    			//自旋
                for (;;) {
                	//h指向头节点
                    SNode h = head;
                    //头节点为null说明栈为空 || 模式一样 压栈
                    if (h == null || h.mode == mode) {  // empty or same-mode
                    	//设置了超时时间并且时间小于等于0
                        if (timed && nanos <= 0) {      // can't wait
                        	//头节点不为空并且头节点被取消
                            if (h != null && h.isCancelled())
                                casHead(h, h.next);     // pop cancelled node
                            else
                            	//反之返回null
                                return null;
                        } else if (casHead(h, s = snode(s, e, h, mode))) {//没有设置超时,将s节点作为头节点,头节点作为s节点的后继,即压栈操作
                        	//压栈成功,则自旋并阻塞当前线程,等待解除阻塞返回匹配成功的节点
                            SNode m = awaitFulfill(s, timed, nanos);
                            //解除阻塞后,如果
                            if (m == s) {               // wait was cancelled
                                clean(s);
                                return null;
                            }
                            if ((h = head) != null && h.next == s)
                                casHead(h, s.next);     // help s's fulfiller
                            return (E) ((mode == REQUEST) ? m.item : s.item);
                        }
                    } else if (!isFulfilling(h.mode)) { // try to fulfill 头节点模式mode,判断是否模式是否为FULFILLING,不为FULFILLING往下执行
                        if (h.isCancelled())            // already cancelled 头节点被取消
                            casHead(h, h.next);         // pop and retry 将头节点的下一个节点作为新的头节点
                        else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {//没被取消,将模式为2的节点压栈,并作为新的头节点
                        	//自旋
                            for (;;) { // loop until matched or waiters disappear
                            	//s节点当前模式为2,s.next为匹配节点
                                SNode m = s.next;       // m is s's match
                                //m的节点为空
                                if (m == null) {        // all waiters are gone 所有等待线程都执行完了
                                    casHead(s, null);   // pop fulfill node 如果s为头节点则重置s节点为null
                                    s = null;           // use new node next time s节点指向空
                                    break;              // restart main loop 退出for循环
                                }
                                //m节点的下一个
                                SNode mn = m.next;
                                if (m.tryMatch(s)) {
                                    casHead(s, mn);     // pop both s and m
                                    return (E) ((mode == REQUEST) ? m.item : s.item);
                                } else                  // lost match
                                    s.casNext(m, mn);   // help unlink
                            }
                        }
                    } else {                            // help a fulfiller 头节点模式是FULFILLING
                        SNode m = h.next;               // m is h's match
                        if (m == null)                  // waiter is gone
                            casHead(h, null);           // pop fulfilling node
                        else {
                            SNode mn = m.next;
                            if (m.tryMatch(h))          // help match
                                casHead(h, mn);         // pop both h and m
                            else                        // lost match
                                h.casNext(m, mn);       // help unlink
                        }
                    }
                }
            }
    		//m的取值可能为0|1|2,FULFILLING为2,要使判断成立,只能取2
    		static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
    
    		//尝试匹配节点
    		boolean tryMatch(SNode s) {
    				//match为null && 修改SNode的match成员变量,这里的this指调此方法的节点对象,从null修改为入参s节点
                    if (match == null &&
                        UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
                         //获取调用此方法的对象的所属线程
                        Thread w = waiter;
                        if (w != null) {    // waiters need at most one unpark 所属线程不为空
                            waiter = null; //重置
                            LockSupport.unpark(w); //唤醒
                        }
                        return true; //匹配成功
                    }
                    return match == s;
             }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111

    awaitFulfill方法自旋或阻塞线程,直到匹配成功

            /**
             * Spins/blocks until node s is matched by a fulfill operation.
             *
             * @param s the waiting node
             * @param timed true if timed wait
             * @param nanos timeout value
             * @return matched node, or s if cancelled
             */
            SNode awaitFulfill(SNode s, boolean timed, long nanos) {
            	//计算截止时间
                final long deadline = timed ? System.nanoTime() + nanos : 0L;
                //获取当前线程对象
                Thread w = Thread.currentThread();
                //计算自旋次数,根据机器CPU核心线程数进行计算
                int spins = (shouldSpin(s) ?
                             (timed ? maxTimedSpins : maxUntimedSpins) : 0);
                //自旋
                for (;;) {
                	//判断中断标记不清除
                    if (w.isInterrupted())
                    	//尝试取消节点
                        s.tryCancel();
                    //获取匹配节点
                    SNode m = s.match;
                    //判断匹配节点不为null则返回
                    if (m != null)
                        return m;
                    //如果设置了超时,计算剩余时间
                    if (timed) {
                        nanos = deadline - System.nanoTime();
                        //剩余时间少于等于0,尝试取消节点
                        if (nanos <= 0L) {
                            s.tryCancel();
                            //下次继续循环
                            continue;
                        }
                    }
                    //判断自旋次数是否还有剩余
                    if (spins > 0)
                    	//
                        spins = shouldSpin(s) ? (spins-1) : 0;
                    else if (s.waiter == null)
                    	//自旋次数用完之后,设置所属线程
                        s.waiter = w; // establish waiter so can park next iter 
                    else if (!timed)
                    	//没有设置超时则park阻塞
                        LockSupport.park(this);
                    else if (nanos > spinForTimeoutThreshold)
                    	//超时时间大于1000L,使用判断时间的阻塞(时间nanos大于0才会阻塞)
                        LockSupport.parkNanos(this, nanos);
                }
            }
    
    		boolean shouldSpin(SNode s) {
    			//头节点
                SNode h = head;
                //头节点跟当前节点相同 || 头节点为null || 如果m的位设置满足则返回true
                return (h == s || h == null || isFulfilling(h.mode));
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
  • 相关阅读:
    java如何创建一个只读集合呢?
    机器人制作开源方案 | 扫地机器人
    大数据培训MR支持的压缩编码
    linux内核中修改TCP MSS值
    【MySQL系列】索引的学习及理解
    ITIL4背景下,ITSM产品应具备哪些特点?
    《网络安全笔记》第二章:Windows基础命令
    Haproxy搭建Web群集
    Android系统的特性
    给电脑重装系统后Win11如何重置记事本?
  • 原文地址:https://blog.csdn.net/IUNIQUE/article/details/125681637