• 多线程与高并发——并发编程(4)


    四、阻塞队列

    1 基础概念

    1.1 生产者消费者概念

    生产者-消费者是设计模式的一种,让生产者和消费者基于一个容器来解决强耦合的问题。生产者与消费者彼此之间不会直接通讯,而是通过一个容器(队列)进行通讯。

    • 生产者生产完数据后扔到容器中,不用等消费者来处理;
    • 消费者也不需要去找生产者要数据,直接从容器中获取即可;
    • 而这种容器最常用的结构就是队列。

    1.2 JUC阻塞队列的存取方法

    常用的存取方法都来自 JUC 包下的 BlockingQueue

    • 生产者存储方法:
      • add(E):添加数据到队列,若队列满了,抛出异常;
      • offer(E):添加数据到队列,若队列满了,返回 false;
      • offer(E,timeout,unit):添加数据到队列,若队列满了,阻塞 timeout 时间,超时后返回 false;
      • put(E):添加数据到队列,若队列满了,挂起线程,等到队列中有位置,再扔数据进去,死等。
    • 消费者取数据方法:
      • remove():从队列中移除数据,若队列为空,抛出异常;
      • poll():从队列中移除数据,若队列为空,返回 false;
      • poll(timeout,unit):从队列中移除数据,若队列为空,阻塞 timeout 时间,等生产者仍数据再获取数据,超时后返回 false;
      • take():从队列中移除数据,若队列为空,挂起线程,一直等生产者仍数据再获取。

    2 ArrayBlockingQueue

    2.1 ArrayBlockingQueue的基本使用

    • ArrayBlockingQueue 在初始化时,必须指定当前队列的长度,因为 ArrayBlockingQueue 是基于数组实现的队列结构,数组长度不可变,必须提前设置数据长度信息。
    public static void main(String[] args) throws InterruptedException {
       
        // 必须设置队列长度
        ArrayBlockingQueue queue = new ArrayBlockingQueue(4);
        // 生产者生产数据
        queue.add("1");
        queue.offer("2");
        queue.offer("3", 2, TimeUnit.SECONDS);
        queue.put("4");
        // 消费者消费数据
        System.out.println(queue.remove());
        System.out.println(queue.poll());
        System.out.println(queue.poll(2, TimeUnit.SECONDS));
        System.out.println(queue.take());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    2.2 生产者方法实现原理

    • 生产者添加数据到队列的方法比较多,需要一个一个看
    2.2.1 ArrayBlockingQueue的常见属性

    ArrayBlockingQueue中的成员变量

    final Object[] items; 				// 就是数组本身
    int takeIndex;						// 取数据的下标
    int putIndex;						// 存数据的下标
    int count;							// 当前数组中元素的个数
    final ReentrantLock lock;			// 就是一个 ReentrantLock 锁
    private final Condition notEmpty;	// 消费者挂起线程和唤醒线程用到的Condition(可看作是synchronized的wait和notify)
    private final Condition notFull;	// 生产者挂起线程和唤醒线程用到的Condition(可看作是synchronized的wait和notify)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    2.2.2 add方法
    • add方法本身就是调用了offer方法,如果offer方法返回false,直接抛出异常
    public boolean add(E e) {
       
        if (offer(e))
            return true;
        else 	// 抛出的异常
            throw new IllegalStateException("Queue full");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    2.2.3 offer方法
    public boolean offer(E e) {
       
        checkNotNull(e);	// 要求存储的数据不允许为null,否则抛出空指针异常
    	// 拿到当前阻塞队列的lock锁
        final ReentrantLock lock = this.lock;
        lock.lock();	// 为保证线程安全,加锁
        try {
       
    		// 判断队列中元素是否满了,若满了,则返回false
            if (count == items.length)
                return false;
            else {
       
    			// 队列没满,执行 enqueue 将元素添加到队列中,并返回true
                enqueue(e);
                return true;
            }
        } finally {
       
            lock.unlock();		// 操作完释放锁
        }
    }
    // ================
    private void enqueue(E x) {
       
        // 拿到数组的引用,将元素放到指定的位置
        final Object[] items = this.items;
        items[putIndex] = x;
    	// 对putIndex进行++操作,并判断是否等于数组长度,需要归为
        if (++putIndex == items.length)
            putIndex = 0;	// 归位:将索引值设置为0
        count++;	// 添加成功,数据++
        notEmpty.signal();	// 将一个Condition中阻塞的线程唤醒
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    2.2.4 offer(time,unit)方法

    生产者在添加数据时,如果队列已经满,阻塞一会:

    • 阻塞到消费者消费了消息,然后唤醒当前阻塞线程;
    • 阻塞到了 timeout 时间,再次判断是否可以添加,若不能直接告辞。
    // 线程在挂起时,如果对当前阻塞线程的终端标记位进行设置,会抛出异常直接结束
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
       
    	// 非空校验
        checkNotNull(e);
        long nanos = unit.toNanos(timeout);		// 将时间单位转为纳秒
        final ReentrantLock lock = this.lock;	// 加锁
        lock.lockInterruptibly();	// 允许线程中断排除异常的加锁方法
        try {
       
            // 为什么是while(虚假唤醒)
            while (count == items.length) {
       	// 如果元素个数和数组长度一致,说明队列满了
                if (nanos <= 0)	// 判断等待时间是否充裕
                    return false;	// 不充裕,直接添加失败,返回false
                // 挂起等待,会同时释放锁资源(对标 synchronized 的wait方法)
                // awaitNanos会挂起线程,并且返回剩余的阻塞时间,恢复执行时,需要重新获取锁资源
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e); // 这里锁门队列有空间了,enqueue将数据添加到阻塞队列中,并返回true
            return true;
        } finally {
       
            lock.unlock();	// 是否锁资源
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    2.2.5 put方法
    • 如果队列是满的,就一直挂起,直到被唤醒,或者被中断
    public void put(E e) throws InterruptedException {
       
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
       
            while (count == items.length)
                // await方法会一直阻塞,直到被唤醒或者被中断
                notFull.await();
            enqueue(e);
        } finally {
       
            lock.unlock();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    2.3 消费者方法实现原理

    2.3.1 remove方法
    • remove方法本身就是调用了poll方法,如果poll方法返回null,直接抛出异常
    public E remove() {
       
        E x = poll();
        if (x != null)
            return x;
        else	// 没数据抛出异常
            throw new NoSuchElementException();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    2.3.2 poll方法
    // 拉取数据
    public E poll() {
       
        final ReentrantLock lock = this.lock;
        lock.lock();	// 加锁
        try {
       
            // 若没有数据,直接返回null;否则执行dequeue,取出数据并返回
            return (count == 0) ? null : dequeue();
        } finally {
       
            lock.unlock();
        }
    }
    // 取出数据
    private E dequeue() {
       
        // 将成员变量引用到局部变量
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];		// 直接获取指定索引位置的数据
        items[takeIndex] = null;		// 取出数据后,清空该索引位置
        if (++takeIndex == items.length)	// 设置下次取数据的索引位置
            takeIndex = 0;
        count--;	// 数组中元素个数减一
        if (itrs != null)	// 迭代器内容先跳过
            itrs.elementDequeued();
        // signal方法,会唤醒当前Condition中排队的一个Node
        // signalAll方法,会将Condition中所有的Node,全都唤醒
        notFull.signal();
        return x;	// 返回数据
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    2.3.3 poll(timeout,unit)方法
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
       
        long nanos = unit.toNanos(timeout);		// 转换时间单位
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();				// 加锁,可中断唤醒
        try {
       
            while (count == 0) {
       	// 如果没数据
                if (nanos <= 0)		// 也没时间了,就不阻塞,返回null
                    return null;
                // 有时间,就挂起消费者线程一段时间
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();	// 取数据
        } finally {
       
            lock.unlock();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    2.3.4 take方法
    public E take() throws InterruptedException {
       
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
       
            while (count == 0)	// 使用while,防止虚假唤醒
                notEmpty.await();
            return dequeue();
        } finally {
       
            lock.unlock();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    2.3.5 虚假唤醒

    阻塞队列中,如果需要线程挂起操作,判断有无数据的位置采用的是while循环,为什么不使用if?

    • 首先肯定不能换成 if 逻辑判断,比如:有线程 A、B、E、C,其中 ABE 是生产者,C是消费者。假如线程的队列是满的,AB挂起
    // E,拿到锁资源,还没有走while判断
    while (count == items.length)
        // A醒了
        // B挂起
        notFull.await();
    enqueue(e)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • C 此时消费一条数据,执行 notFull.signal() 唤醒一个线程,A线程被唤醒;E走判断发现有空余位置,可以添加数据到队列,则E添加数据,走enqueue。
    • 如果判断是 if,A 在E释放锁资源后,拿到锁资源,直接走 enqueue 方法,此时 A线程就是在 putIndex 的位置,覆盖掉之前的数据,会造成数据安全问题。

    3 LinkedBlockingQueue

    3.1 LinkedBlockingQueue的底层实现

    • 查看 LinkedBlockingQueue 是如何存储数据,以及如何实现链表结构的。
    // Node对象就是存储数据的单位
    static class Node<E> {
       
        // 存储的数据
        E item;
    	// 指向下一个数据的指针
        Node<E> next;
    	// 有参构造
        Node(E x) {
        item = x; }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 查看LinkedBlockingQueue的有参构造
    // 可以手动指定LinkedBlockingQueue的长度,如果没有指定,默认为Integer.MAX_VALUE
    public LinkedBlockingQueue(int capacity) {
       
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        // 在初始化时,构建一个item为null的节点,作为head和last,这种node可以成为哨兵Node,
        // 如果没有哨兵节点,那么在获取数据时,需要判断head是否为null,才能找next
        // 如果没有哨兵节点,那么在添加数据时,需要判断last是否为null,才能找next
        last = head = new Node<E>(null);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 查看LinkedBlockingQueue的其他属性
    // 因为是链表,没有想数组的length属性,基于AtomicInteger来记录长度
    private final AtomicInteger count = new AtomicInteger();
    transient Node<E> head;	// 链表的头
    • 1
    • 2
  • 相关阅读:
    .NET Core Web APi类库如何内嵌运行?
    Python机器学习015:pytorch快速入门
    不要轻易更换zotero同步的官网账号
    什么是缓存雪崩、缓存穿透、缓存击穿?怎么解决?
    新兴国家战略级安全话题-软件供应链安全
    [JavaScript]_[初级]_[使用HTMLElement.dataset快速读写自定义属性]
    PWN环境搭建
    【matplotlib 实战】--箱型图
    从零开始写 Docker(二)---优化:使用匿名管道传递参数
    关系型数据库RDS基本简介
  • 原文地址:https://blog.csdn.net/yangwei234/article/details/132655222