• BlockingQueue二


    接着上篇BlockingQueue没讲完的

    LinkedTransferQueue

    LinkedTransferQueue是一个由链表结构组成的无界阻塞队列,相对于其它阻塞队列,LinkedBlockingQueue可以算是LinkedBlockingQueue与SynhronoousQueue结合,LinkedtransferQueue是一种无界阻塞队列,底层基于单链表实现,其内部结构分为数据节点、请求节点,基于CAS无锁算法实现

    与前面类似不再赘述

            final boolean isData;   
            volatile Object item;  
            volatile Node next;
            volatile Thread waiter;
    
    • 1
    • 2
    • 3
    • 4

    其中节点操作过程类似于SynchronousQueue
    在这里插入图片描述
    在这里插入图片描述
    与SynchronousQueue有区别的是这个可以设置是否阻塞当前线程

    NOW=0表示即时操作(可能失败),即不会阻塞调用线程
    poll(获取并移除首元素,如果队列为空,直接返回null)
    tryTransfer(尝试将元素传递给消费者,如果没有等待的消费者则立即返回false,也不会将元素入队)

    ASYNC=1表示异步操作(必然成功)
    xfer被操作线程调用时,无论xfer操作过程多久完成,调用者都不会阻塞等待
    offer,put,add(插入指定元素到队尾,由于是无界队列,所以会立即返回true)

    SYNC=2表示同步操作(阻塞调用线程)
    只有xfer操作过程达到了调用线程所期望的结果,调用者才会继续向下执行

    TIMED=3表示限时同步操作

    PriorityBlockingQueue

    优先级队列,里面是数组,但是数组与普通数组不一样,里面的数组维护了一颗堆的二叉树
    默认大小为11,但是这个可以扩容

    	//默认容量
    	private static final int DEFAULT_INITIAL_CAPACITY = 11;
    	//最大容量
        private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    	//存储数据
        private transient Object[] queue;
    	//元素个数
        private transient int size;
    	//比较
        private transient Comparator<? super E> comparator;
    	//锁
        private final ReentrantLock lock;
    	//等待
        private final Condition notEmpty;
    	
        private transient volatile int allocationSpinLock;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    扩容

    如果容量小于64的时候,扩容为原来两倍+2;
    如果容量大于64的时候,扩容为原来1.5倍

        private void tryGrow(Object[] array, int oldCap) {
            lock.unlock(); //扩容前先释放锁(扩容可能会费时,先让出锁,让出队线程可以正常操作)
            Object[] newArray = null;
            if (allocationSpinLock == 0 &&
                UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                         0, 1)) {//通过CAS操作确保只有一个线程可以扩容
                try {
                    int newCap = oldCap + ((oldCap < 64) ?
                                           (oldCap + 2) : 
                                           (oldCap >> 1));
                    if (newCap - MAX_ARRAY_SIZE > 0) {//大于当前最大容量则可能溢出
                        int minCap = oldCap + 1;
                        if (minCap < 0 || minCap > MAX_ARRAY_SIZE)//扩大一个元素也溢出或者超过最大容量则抛出异常
                            throw new OutOfMemoryError();
                        newCap = MAX_ARRAY_SIZE;//扩容后如果超过最大容量,则只扩大到最大容量
                    }
                    if (newCap > oldCap && queue == array)
                        newArray = new Object[newCap];//根据最新容量初始化一个新数组
                } finally {
                    allocationSpinLock = 0;
                }
            }
            if (newArray == null) //如果是空,说明前面CAS失败,有线程在扩容,让出CPU
                Thread.yield();
            lock.lock();//这里重新加锁是确保数组复制操作只有一个线程能进行
            if (newArray != null && queue == array) {
                queue = newArray;
                System.arraycopy(array, 0, newArray, 0, oldCap);//将旧的元素复制到新数组
            }
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    添加元素

    添加元素不会阻塞线程,因为该队列是一个无界队列,因为可以扩容,所以添加元素不会出现阻塞

        public boolean offer(E e) {
            if (e == null)
                throw new NullPointerException();
            final ReentrantLock lock = this.lock;
            lock.lock();
            int n, cap;
            Object[] array;
            while ((n = size) >= (cap = (array = queue).length))
                tryGrow(array, cap);
            try {
                Comparator<? super E> cmp = comparator;
                if (cmp == null)
                    siftUpComparable(n, e, array);
                else
                    siftUpUsingComparator(n, e, array, cmp);
                size = n + 1;
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
            return true;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    取出元素

    取出元素需要判断是否为空,如果为空则需要等待,不然直接返回

        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            E result;
            try {
                while ( (result = dequeue()) == null)
                    notEmpty.await();
            } finally {
                lock.unlock();
            }
            return result;
        }
    
        private E dequeue() {
            int n = size - 1;
            if (n < 0)
                return null;
            else {
                Object[] array = queue;
                E result = (E) array[0];
                E x = (E) array[n];
                array[n] = null;
                Comparator<? super E> cmp = comparator;
                if (cmp == null)
                    siftDownComparable(0, x, array, n);
                else
                    siftDownUsingComparator(0, x, array, n, cmp);
                size = n;
                return result;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    里面主要堆的上浮与下沉

    另一个上浮的方法除了比较器不同以外其它都类似,所以就讲这一个
    假设我们构造的是小根堆

        private static <T> void siftUpComparable(int k, T x, Object[] array) {
        	// 其中k就是当前放的末尾的位置
            Comparable<? super T> key = (Comparable<? super T>) x;
            while (k > 0) {
                int parent = (k - 1) >>> 1; //找到其父节点
                Object e = array[parent];
                if (key.compareTo((T) e) >= 0) //如果当前放入的值大于其父节点则跳出,否则继续
                    break;
                // 到这里说明当前放入的值小于其父节点,与父节点交换位置,并且k变为父节点的位置
                array[k] = e; 
                k = parent;
            }
            array[k] = key;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
     private static <T> void siftDownComparable(int k, T x, Object[] array,int n) {
            if (n > 0) {
                Comparable<? super T> key = (Comparable<? super T>)x;
                int half = n >>> 1;           
                while (k < half) {
                    int child = (k << 1) + 1;   
                    Object c = array[child];
                    int right = child + 1;
                    if (right < n &&
                        ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                        c = array[child = right];   // 如果右孩子比左孩子小,则弄成右孩子
                    if (key.compareTo((T) c) <= 0) //如果传入的值小于孩子则退出
                        break;
                    array[k] = c;
                    k = child;
                }
                array[k] = key;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    LinkedBlockingDeque

    LinkedBlockingQueue类似,只是这个是可以从两端存取,而LinkedBlockingQueue是单链表只能从一边存取,同时LinkedBlockingDeque只有一把锁,如果两把锁的话容易造成下标出错

    DelayQueue

    其中内部也是由一个PriorityQueue维护一个优先队列

    add

        public boolean offer(E e) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                q.offer(e);
                if (q.peek() == e) {
                    leader = null;
                    available.signal();
                }
                return true;
            } finally {
                lock.unlock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    take

    public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    E first = q.peek();
                    if (first == null)
                        available.await();  //如果队列为空阻塞
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return q.poll(); //如果到期了就返回
                        first = null; // don't retain ref while waiting
                        if (leader != null) // 没有到期且leader不为空,等待
                            available.await();
                        else { //头节点为空,设置当前线程为头节点
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && q.peek() != null)
                    available.signal();
                lock.unlock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    Leader-Follower线程模型

    在Leader-follower线程模型中每个线程有三种模式:

    leader:只有一个线程成为leader,如DelayQueue如果有一个线程在等待元素到期,则其他线程就会阻塞等待
    follower:会一直尝试争抢leader,抢到leader之后才开始干活
    processing:处理中的线程

    感谢这位大佬 双子孤狼的博客

  • 相关阅读:
    cvpr2022 车道线检测之eigenLanes
    13_星仔带你学Java之接口、内部类、枚举
    车规级共模电感厂家教你贴片共模电感如何选型
    SpringBoot中自动装配机制
    94. 二叉树的中序遍历(递归+迭代)
    C++除法分支
    Redis缓存知识总结
    使用curl测试nodejs的http server
    Vue3数组重新赋值问题
    send line/selection to terminal
  • 原文地址:https://blog.csdn.net/m0_74787523/article/details/128169302