• java数据结构--阻塞队列


    目录

    一.概念

    二.生产者消费者问题

    三.阻塞队列接口BlockingQueue

    四.基于数组实现单锁的阻塞队列

    1.加锁方式

    2.代码实现

    3.解释说明

    (1).offer添加元素

    (2)poll取出元素

    4.timeout超时时间

    5.测试

    五.基于数组实现双锁的阻塞队列

    1.问题

    2.关于size共享变量

    3.死锁问题

     4.级联唤醒

    (1)offer中只唤醒一次,其他交给poll线程唤醒

    (2)poll中只唤醒一次,其他交给offer线程唤醒

    5.完整代码


    一.概念

    阻塞队列是一种特殊类型的队列,具有额外的阻塞操作。在阻塞队列中,当队列为空时,从队列中获取元素的操作会被阻塞,直到有元素被添加到队列中为止;当队列满时,向队列中添加元素的操作会被阻塞,直到队列有空闲位置为止。

    阻塞队列在多线程编程中非常有用,可以有效地进行线程间的协调和通信。它提供了一种线程安全的方式来共享数据,避免了常见的并发问题,如资源争用和死锁。

    常见的阻塞队列有以下几种实现方式:

    1. ArrayBlockingQueue:基于数组实现的有界阻塞队列,需要指定队列的容量。
    2. LinkedBlockingQueue:基于链表实现的可选有界或无界阻塞队列。
    3. PriorityBlockingQueue:基于堆实现的无界优先级阻塞队列,元素按照优先级进行排序。
    4. SynchronousQueue:特殊的阻塞队列,每个插入操作必须等待一个对应的删除操作,反之亦然。

    阻塞队列提供了一些常用的操作方法,如put()和take()等。put()方法用于向队列中添加元素,并在队列满时阻塞调用线程;take()方法用于获取队列中的元素,并在队列为空时阻塞调用线程。

    使用阻塞队列可以简化并发编程的实现,提高代码的可读性和维护性。它能够有效地控制线程的访问顺序,并提供了一种直观的方式来处理线程间的同步问题。

    二.生产者消费者问题

     我们先来分析一下,在我们之前学过的操作系统中,有一个生产者和消费者问题,当生产者每次生产完一个产品后,要往缓冲区中放入,但是如果此时缓冲区是满的,那么就要让生产者进入阻塞状态,进行等待,当消费者从缓冲区中取出一个产品消费后,缓冲区有了空位,此时生产者就被唤醒,往缓冲区中放入生产的产品,如果有多个生产者和消费者,此时为了防止出现混乱,就要在放入缓冲区之前加锁,放入后解锁,消费者在从缓冲区取之前也要加锁,取出后再解锁

    三.阻塞队列接口BlockingQueue

    1. /**
    2. * 阻塞队列接口定义
    3. * @param
    4. */
    5. public interface BlockingQueue {
    6. void offer(E e) throws InterruptedException;
    7. boolean offer(E e,long timeout) throws InterruptedException;
    8. E poll() throws InterruptedException;
    9. E poll(long timeout) throws InterruptedException;
    10. boolean isFull();
    11. boolean isEmpty();
    12. }

      我们定义了接口,其中,主要就是成对的两个方法,offer和poll,还有在此基础上加入超时时间的,加入超时时间是,当队列满时,超过了等待时间,就不去添加了,直接返回失败false

    四.基于数组实现单锁的阻塞队列

    1.加锁方式

      这里我们先说明一下,在java中,可以有两种方式加锁

      1.synocized关键字加锁

     2.ReentrantLock类创建可重入锁对象,该对象还可以创建出条件对象,来执行阻塞和唤醒操作

    2.代码实现

      我们先来用单锁实现一下阻塞队列

    1. /**
    2. * 基于数组实现阻塞队列
    3. * @param
    4. */
    5. public class ArrayBlockQueue implements BlockingQueue{
    6. private final E[] array;
    7. private int head;
    8. private int tail;
    9. private int size;
    10. @SuppressWarnings("all")
    11. public ArrayBlockQueue(int capacity){
    12. array = (E[]) new Object[capacity];
    13. }
    14. //锁对象
    15. private final ReentrantLock reentrantLock = new ReentrantLock();
    16. //条件对象
    17. private final Condition headWaits = reentrantLock.newCondition();
    18. private final Condition tailWaits = reentrantLock.newCondition();
    19. /**
    20. * 向队尾添加元素,如果队列为满,则阻塞当前线程
    21. * @param e 要添加元素
    22. * @throws InterruptedException
    23. */
    24. @Override
    25. public void offer(E e) throws InterruptedException {
    26. //先加锁
    27. reentrantLock.lock();
    28. try{
    29. //判断队列是否为满,这里要用while循环来判断,防止虚假唤醒
    30. while (isFull()){
    31. //如果是满的,就让当前线程阻塞
    32. tailWaits.await(); //因为是向队尾添加元素,所以用tailWaits
    33. }
    34. //当队列不满时,可以进行添加
    35. array[tail] = e;
    36. //先判断tail是否越界
    37. if(++tail == array.length){
    38. tail = 0;
    39. }
    40. //让size+1
    41. size++;
    42. //这时要通知poll,因为放入了一个元素,所以队列肯定不为空,通知poll线程可以取元素了
    43. headWaits.signal(); //唤醒poll线程
    44. }finally {
    45. //释放锁
    46. reentrantLock.unlock();
    47. }
    48. }
    49. /**
    50. * 向队尾添加元素,加入超时时间,如果队满,并且过了超时时间,返回false
    51. * @param e 要添加的元素
    52. * @param timeout 超时时间
    53. * @return 是否添加成功
    54. * @throws InterruptedException
    55. */
    56. @Override
    57. public boolean offer(E e, long timeout) throws InterruptedException {
    58. //先加锁
    59. reentrantLock.lock();
    60. try{
    61. //将传入的时间转换为纳秒
    62. long t = TimeUnit.MILLISECONDS.toNanos(timeout);
    63. //判断队列是否为满,这里要用while循环来判断,防止虚假唤醒
    64. while (isFull()){
    65. /**
    66. * 如果是满的,就让当前线程阻塞
    67. * 我们用加入时间的方法来阻塞
    68. * 注意这里我们每次阻塞完唤醒后,就更新等待时间
    69. * 如果要等待的时间是5s,那么如果等待了1s发现队列有空,那么唤醒之后,
    70. * 如果是虚假唤醒,就要再次等待,那么下次等待时间就是4s
    71. * 如果等待时间 t<=0了,说明等待超时,直接返回false,不要在等了
    72. **/
    73. if(t <= 0){
    74. return false;
    75. }
    76. t = tailWaits.awaitNanos(t);
    77. }
    78. //当队列不满时,可以进行添加
    79. array[tail] = e;
    80. //先判断tail是否越界
    81. if(++tail == array.length){
    82. tail = 0;
    83. }
    84. //让size+1
    85. size++;
    86. //这时要通知poll,因为放入了一个元素,所以队列肯定不为空,通知poll线程可以取元素了
    87. headWaits.signal(); //唤醒poll线程
    88. /**
    89. * 到这里就说明添加成功,返回true
    90. */
    91. return true;
    92. }finally {
    93. //释放锁
    94. reentrantLock.unlock();
    95. }
    96. }
    97. /**
    98. * 移除队头元素,如果队列为空,则阻塞当前线程
    99. * @return 队头元素
    100. * @throws InterruptedException
    101. */
    102. @Override
    103. public E poll() throws InterruptedException {
    104. //先加锁
    105. reentrantLock.lock();
    106. try {
    107. //先循环判断队列是否为空
    108. while (isEmpty()){
    109. //如果队列为空,让当前线程阻塞
    110. headWaits.await();
    111. }
    112. //如果队列不为空,可以取了
    113. E e = array[head];
    114. array[head] = null; //help GC
    115. //判断head是否越界
    116. if(++head == array.length){
    117. head = 0;
    118. }
    119. //让size-1
    120. size--;
    121. //这时队列因为取出了一个元素,所以肯定不为满,通知offer线程可以添加元素了
    122. tailWaits.signal(); //唤醒offer线程
    123. return e;
    124. }finally {
    125. //释放锁
    126. reentrantLock.unlock();
    127. }
    128. }
    129. /**
    130. * 移除队头元素,加入超时时间,如果队空,并且超过等待时间,返回null
    131. * @param timeout 超时时间
    132. * @return 队头元素
    133. * @throws InterruptedException
    134. */
    135. @Override
    136. public E poll(long timeout) throws InterruptedException {
    137. //先加锁
    138. reentrantLock.lock();
    139. try {
    140. long t = TimeUnit.MILLISECONDS.toNanos(timeout);
    141. //先循环判断队列是否为空
    142. while (isEmpty()){
    143. //如果队列为空,让当前线程阻塞
    144. if(t <= 0){
    145. return null;
    146. }
    147. t = headWaits.awaitNanos(t);
    148. }
    149. //如果队列不为空,可以取了
    150. E e = array[head];
    151. array[head] = null; //help GC
    152. //判断head是否越界
    153. if(++head == array.length){
    154. head = 0;
    155. }
    156. //让size-1
    157. size--;
    158. //这时队列因为取出了一个元素,所以肯定不为满,通知offer线程可以添加元素了
    159. tailWaits.signal(); //唤醒offer线程
    160. return e;
    161. }finally {
    162. //释放锁
    163. reentrantLock.unlock();
    164. }
    165. }
    166. @Override
    167. public boolean isFull() {
    168. return size == array.length;
    169. }
    170. @Override
    171. public boolean isEmpty() {
    172. return size == 0;
    173. }
    174. @Override
    175. public String toString() {
    176. return Arrays.toString(array);
    177. }
    178. }

    3.解释说明

    注释中也都写了,我在这里再次解释一下:

    (1).offer添加元素

      当我们执行offer方法向队列中添加元素时:

     1. 我们需要先加锁,

     2. 我们判断队列是否为满,

     3.  如果满了,我们需要让当前线程阻塞 ,

      注意:这里使用的是while循环,为什么不用if呢?我们想,如果用if,那么它只会判断一次,如果当某个时刻,队列从满变为不满,这时我们阻塞的offer线程被唤醒,将要去添加元素,但就在此时,另一个offer1线程可能在offer线程添加之前抢先往队列中添加了元素,那么offer线程再去添加就会报错,也就是虚假唤醒(spurious wakeups),使用while循环判断队列是否为满,可以在阻塞线程被唤醒后重新判断队列是否满足条件。如果队列仍然满,线程会继续被阻塞,直到队列有空闲位置。这样可以预防虚假唤醒的问题,确保线程只有在满足条件的情况下才会执行添加元素的操作。

     4.当while条件不成立时,也就是队列有空为,并且此时没有其他线程来争抢,那么就可以往队列中添加元素了 , queue[tail] = e;

    5.判断++tail是否达到了数组末尾位置,如果到了,那么重新调整为0,相当于一个圆圈

    6.让size++

    7.当offer线程向队列中添加元素后,此时队列肯定不为空,我们应该向poll线程发出信号,可以唤醒,相当于操作系统中的信号量机制

    (2)poll取出元素

        1.先加锁

        2.判断队列是否为空,同理使用while循环判断

        3.当队列不为空时,取出队头元素

        4.判断++head是否到数组末尾位置,如果到了,重新置为0

        5.让size--;

        6.当poll线程从队列中取出元素后,队列肯定不为满,我们应该向offer线程发出信号,唤醒offer线程

    4.timeout超时时间

     我们可以为offer和poll设置超时时间,当超过了等待时间,将直接返回,不在执行

     看一下offer方法设置timeout

        /**
                     *  如果是满的,就让当前线程阻塞
                     *  我们用加入时间的方法来阻塞
                     *  注意这里我们每次阻塞完唤醒后,就更新等待时间
                     *  如果要等待的时间是5s,那么如果等待了1s发现队列有空,那么唤醒之后,
                     *  如果是虚假唤醒,就要再次等待,那么下次等待时间就是4s
                     *  如果等待时间 t<=0了,说明等待超时,直接返回false,不要在等了
                     **/

    poll方法同理... 

    5.测试

     下面让我们来测试一下代码

      

    1. public class TestBlockQueue {
    2. public static void main(String[] args) throws InterruptedException {
    3. ArrayBlockQueue queue = new ArrayBlockQueue<>(4);
    4. queue.offer("task1");
    5. queue.offer("task2");
    6. queue.offer("task3");
    7. queue.offer("task4");
    8. System.out.println(queue);
    9. new Thread(() -> {
    10. System.out.println(Thread.currentThread().getName() + "开始添加元素...");
    11. try {
    12. boolean flag = queue.offer("task5", 4000);
    13. if(flag){
    14. System.out.println(Thread.currentThread().getName() + "添加元素成功....");
    15. }else {
    16. System.out.println(Thread.currentThread().getName()+"添加元素超超时失败....");
    17. }
    18. System.out.println(queue);
    19. } catch (InterruptedException e) {
    20. throw new RuntimeException(e);
    21. }
    22. }, "t1").start();
    23. }
    24. }

     我们给队列初始大小设为4,然后向队列中添加4个任务,此时队列为满,然后我们开启一个t1线程向队列中添加元素,设置超时时间为4s,判断是否能添加成功

    运行:

    1. [task1, task2, task3, task4]
    2. t1开始添加元素...
    3. t1添加元素超超时失败....
    4. [task1, task2, task3, task4]
    5. 进程已结束,退出代码0

    可以看到,过了4s,添加失败,因为我们并没有取出任何元素,所以offer线程一直阻塞直到超时失败!

    下面,我们让主线程先休眠2s,然后取出一个元素,再次观察t1是否能添加成功:

    1. //让主线程休眠2s,然后poll
    2. Thread.sleep(2000);
    3. queue.poll();

    在上面的代码中添加以上代码,然后运行:

    1. [task1, task2, task3, task4]
    2. t1开始添加元素...
    3. t1添加元素成功....
    4. [task5, task2, task3, task4]
    5. 进程已结束,退出代码0

    这次可以看到,task5被成功的添加到了队列中,因为我们在超时时间之前取出了队列的一个元素,队列有了空位,task5就可以添加到队列中了。

    五.基于数组实现双锁的阻塞队列

    1.问题

     上面我们是用一把锁来给offer线程和poll线程加锁,他们两个操作用的同一把锁,这样其实并不好,效率比较低,而且添加和取出操作应该是两个互不影响的操作,是互相解耦的,所以我们应该使用双锁来给他们分别加锁和释放锁

    2.关于size共享变量

      当我们换成双锁后,需要思考一个问题,这个头指针head和尾指针tail,head是poll线程用来取出队列头元素的,tail是offer线程用来向队尾添加元素使用的,所以说head和tail这两个变量是互不影响的,它们分别在各自的线程里使用,但是对于size,在offer线程中,最后要让size++;在poll线程中,最后要让size--,这就是共享的变量了,可能会出现线程安全问题,如果两个线程不是顺序执行的,而是交错执行,就会是size的值发生混乱,所以我们要对size作约束

     在java中,我们可以实现原子类来对变量进行线程安全保护,对于int类型的我们使用AtomicInteger

    加一可以使用getAndIncreament()方法,减一可以使用getAndDecreament()方法

    1. //offer锁
    2. private final ReentrantLock headLock = new ReentrantLock();
    3. //poll锁
    4. private final ReentrantLock tailLock = new ReentrantLock();
    5. //条件对象
    6. private final Condition headWaits = headLock.newCondition();
    7. private final Condition tailWaits = tailLock.newCondition();

    3.死锁问题

      如果我们添加了双锁,那么我们需要设置各自的条件去阻塞和唤醒线程,先看一下offer线程,

    1. @Override
    2. public void offer(E e) throws InterruptedException {
    3. //先加锁
    4. tailLock.lock();
    5. try{
    6. //判断队列是否为满,这里要用while循环来判断,防止虚假唤醒
    7. while (isFull()){
    8. //如果是满的,就让当前线程阻塞
    9. tailWaits.await(); //因为是向队尾添加元素,所以用tailWaits
    10. }
    11. //当队列不满时,可以进行添加
    12. array[tail] = e;
    13. //先判断tail是否越界
    14. if(++tail == array.length){
    15. tail = 0;
    16. }
    17. //让size+1
    18. size.getAndIncrement();
    19. headLock.lock();
    20. try {
    21. headWaits.signal(); //poll1
    22. }finally {
    23. headLock.unlock();
    24. }
    25. }finally {
    26. //释放锁
    27. tailLock.unlock();
    28. }
    29. }
    30. }

    注意看,这里我们用headWaits唤醒线程时,它是在tailLock释放锁之前,也就是一个嵌套结构,这样就会导致死锁的发生,

     

    如果tailLock先加锁了,然后headLock也去加锁,之后在offer线程中的headLock想去加锁就加不上了,同理poll线程中的tailLock想加锁也加不上去,他们两个线程互相僵持,陷入了死锁状态,为了防止死锁发生,我们只要把嵌套结构改为平级结构就可以了,这样就能保证一定是释放完锁之后再去加锁,一定可以加锁成功!

     

    1. @Override
    2. public void offer(E e) throws InterruptedException {
    3. //先加锁
    4. tailLock.lock();
    5. try{
    6. //判断队列是否为满,这里要用while循环来判断,防止虚假唤醒
    7. while (isFull()){
    8. //如果是满的,就让当前线程阻塞
    9. tailWaits.await(); //因为是向队尾添加元素,所以用tailWaits
    10. }
    11. //当队列不满时,可以进行添加
    12. array[tail] = e;
    13. //先判断tail是否越界
    14. if(++tail == array.length){
    15. tail = 0;
    16. }
    17. //让size+1
    18. size.getAndIncrement();
    19. }finally {
    20. //释放锁
    21. tailLock.unlock();
    22. }
    23. /**
    24. * 平级可以防止死锁
    25. * 唤醒poll线程,但是因为是各自不同的锁,所以需要在他们各自的锁内唤醒
    26. */
    27. headLock.lock();
    28. try {
    29. headWaits.signal(); //poll1
    30. }finally {
    31. headLock.unlock();
    32. }
    33. }

    这样的平级结构就可以防止死锁发生了... 

     4.级联唤醒

      以上的唤醒逻辑,是每次都要进行一次唤醒,这样其实效率还不是最好的,那么我们可不可以减少唤醒的次数来提高效率呢?我们可以通过级联唤醒来实现

    (1)offer中只唤醒一次,其他交给poll线程唤醒

      先看一下offer线程,在最后要随机唤醒一个poll线程,我们让它只唤醒一个线程,剩下的线程交给poll线程自己去唤醒,比如有poll1,poll2,poll3,poll4四个线程,那么我们的offer线程只唤醒poll1线程,然后让poll1去唤醒poll2,poll2去唤醒poll3,以此类推...,这样就可以让offer只执行一次唤醒,提高了效率

     

    1. @Override
    2. public void offer(E e) throws InterruptedException {
    3. //记录一下每次size加一之前的值
    4. int c;
    5. //先加锁
    6. tailLock.lock();
    7. try{
    8. //判断队列是否为满,这里要用while循环来判断,防止虚假唤醒
    9. while (isFull()){
    10. //如果是满的,就让当前线程阻塞
    11. tailWaits.await(); //因为是向队尾添加元素,所以用tailWaits
    12. }
    13. //当队列不满时,可以进行添加
    14. array[tail] = e;
    15. //先判断tail是否越界
    16. if(++tail == array.length){
    17. tail = 0;
    18. }
    19. //让size+1
    20. c = size.getAndIncrement();
    21. //如果c+1>array.length,说明队列还是有空位置,就自己唤醒后面的线程
    22. if( c+1 > array.length){
    23. tailWaits.signal();
    24. }
    25. }finally {
    26. //释放锁
    27. tailLock.unlock();
    28. }
    29. /**
    30. * 平级可以防止死锁
    31. * 唤醒poll线程,但是因为是各自不同的锁,所以需要在他们各自的锁内唤醒
    32. */
    33. if( c == 0){ //offer_0,offer_1,offer_2, c=0说明是第一个offer_0,队列时空的,准备添加第一个
    34. headLock.lock();
    35. try {
    36. /**
    37. * 这里我们只唤醒第一个poll1线程,其他的
    38. * 交给poll线程自己唤醒
    39. * 如果c为0,说明队列为空,准备要添加第一个元素,就只让offer_0来唤醒
    40. */
    41. headWaits.signal(); //poll1
    42. }finally {
    43. headLock.unlock();
    44. }
    45. }
    46. }

       我们来看改进后的代码,先设置一个c变量来记录每次size改变前的值,在唤醒时,先判断c是否等于0,这里等于0说明队列从空的状态开始,去添加第一个元素,那么也就是第一个offer1线程,让它去唤醒poll线程,其他的poll线程交给poll线程自己去唤醒,再看一下poll的代码:

    1. @Override
    2. public E poll() throws InterruptedException {
    3. E e;
    4. int c;
    5. //先加锁
    6. headLock.lock();
    7. try {
    8. //先循环判断队列是否为空
    9. while (isEmpty()){
    10. //如果队列为空,让当前线程阻塞
    11. headWaits.await();
    12. }
    13. //如果队列不为空,可以取了
    14. e = array[head];
    15. array[head] = null; //help GC
    16. //判断head是否越界
    17. if(++head == array.length){
    18. head = 0;
    19. }
    20. //让size-1
    21. c = size.getAndDecrement();
    22. //在这里让poll1唤醒poll2,poll2接着唤醒poll3...
    23. if( c > 1 ){
    24. //c>1说明队列中还有不止一个元素,可以继续唤醒其他poll线程来去元素
    25. headWaits.signal();
    26. }
    27. }finally {
    28. //释放锁
    29. headLock.unlock();
    30. }
    31. /**
    32. * 平级,防止死锁
    33. * 唤醒offer线程,需要加锁
    34. * 当 c == array.length时,说明队列从满变为不满,这时才去
    35. * 给tailLock加锁
    36. */
    37. if( c == array.length ){
    38. tailLock.lock();
    39. try {
    40. tailWaits.signal();
    41. }finally {
    42. tailLock.unlock();
    43. }
    44. }
    45. return e;
    46. }

    在poll代码中,只需要判断c是否大于1,如果c>1,说明队列中还有元素,可以继续唤醒,那么就让poll1去唤醒poll2,poll2去唤醒poll3.....

    (2)poll中只唤醒一次,其他交给offer线程唤醒

    再来看poll中,当c==array.length时,说明这时是队列从满变为不满,只有这时才去唤醒,其他情况,比如队列时不满的,也不去唤醒,

    然后在offer中,当c+1

    5.完整代码

    1. /**
    2. * 基于数组的双锁实现阻塞队列
    3. */
    4. public class ArrayDLBlockQueue implements BlockingQueue {
    5. private final E[] array;
    6. private int head;
    7. private int tail;
    8. //在双锁条件下,size是共享的变量,需要保证原子性
    9. private AtomicInteger size = new AtomicInteger();
    10. @SuppressWarnings("all")
    11. public ArrayDLBlockQueue(int capacity){
    12. array = (E[]) new Object[capacity];
    13. }
    14. //offer锁
    15. private final ReentrantLock headLock = new ReentrantLock();
    16. //poll锁
    17. private final ReentrantLock tailLock = new ReentrantLock();
    18. //条件对象
    19. private final Condition headWaits = headLock.newCondition();
    20. private final Condition tailWaits = tailLock.newCondition();
    21. /**
    22. * 向队尾添加元素,如果队列为满,则阻塞当前线程
    23. * @param e 要添加元素
    24. * @throws InterruptedException
    25. */
    26. @Override
    27. public void offer(E e) throws InterruptedException {
    28. //记录一下每次size加一之前的值
    29. int c;
    30. //先加锁
    31. tailLock.lock();
    32. try{
    33. //判断队列是否为满,这里要用while循环来判断,防止虚假唤醒
    34. while (isFull()){
    35. //如果是满的,就让当前线程阻塞
    36. tailWaits.await(); //因为是向队尾添加元素,所以用tailWaits
    37. }
    38. //当队列不满时,可以进行添加
    39. array[tail] = e;
    40. //先判断tail是否越界
    41. if(++tail == array.length){
    42. tail = 0;
    43. }
    44. //让size+1
    45. c = size.getAndIncrement();
    46. //如果c+1>array.length,说明队列还是有空位置,就自己唤醒后面的线程
    47. if( c+1 > array.length){
    48. tailWaits.signal();
    49. }
    50. }finally {
    51. //释放锁
    52. tailLock.unlock();
    53. }
    54. /**
    55. * 平级可以防止死锁
    56. * 唤醒poll线程,但是因为是各自不同的锁,所以需要在他们各自的锁内唤醒
    57. */
    58. if( c == 0){ //offer_0,offer_1,offer_2, c=0说明是第一个offer_0,队列时空的,准备添加第一个
    59. headLock.lock();
    60. try {
    61. /**
    62. * 这里我们只唤醒第一个poll1线程,其他的
    63. * 交给poll线程自己唤醒
    64. * 如果c为0,说明队列为空,准备要添加第一个元素,就只让offer_0来唤醒
    65. */
    66. headWaits.signal(); //poll1
    67. }finally {
    68. headLock.unlock();
    69. }
    70. }
    71. }
    72. /**
    73. * 向队尾添加元素,加入超时时间,如果队满,并且过了超时时间,返回false
    74. * @param e 要添加的元素
    75. * @param timeout 超时时间
    76. * @return 是否添加成功
    77. * @throws InterruptedException
    78. */
    79. @Override
    80. public boolean offer(E e, long timeout) throws InterruptedException {
    81. int c;
    82. //先加锁
    83. tailLock.lock();
    84. try{
    85. //将传入的时间转换为纳秒
    86. long t = TimeUnit.MILLISECONDS.toNanos(timeout);
    87. //判断队列是否为满,这里要用while循环来判断,防止虚假唤醒
    88. while (isFull()){
    89. /**
    90. * 如果是满的,就让当前线程阻塞
    91. * 我们用加入时间的方法来阻塞
    92. * 注意这里我们每次阻塞完唤醒后,就更新等待时间
    93. * 如果要等待的时间是5s,那么如果等待了1s发现队列有空,那么唤醒之后,
    94. * 如果是虚假唤醒,就要再次等待,那么下次等待时间就是4s
    95. * 如果等待时间 t<=0了,说明等待超时,直接返回false,不要在等了
    96. **/
    97. if(t <= 0){
    98. return false;
    99. }
    100. t = tailWaits.awaitNanos(t);
    101. }
    102. //当队列不满时,可以进行添加
    103. array[tail] = e;
    104. //先判断tail是否越界
    105. if(++tail == array.length){
    106. tail = 0;
    107. }
    108. //让size+1
    109. c = size.getAndIncrement();
    110. if(c+1 < array.length){
    111. tailWaits.signal();
    112. }
    113. }finally {
    114. //释放锁
    115. tailLock.unlock();
    116. }
    117. /**
    118. * 平级防止死锁
    119. */
    120. if( c==0){
    121. headLock.lock();
    122. try {
    123. headWaits.signal();
    124. }finally {
    125. headLock.unlock();
    126. }
    127. }
    128. return true;
    129. }
    130. /**
    131. * 移除队头元素,如果队列为空,则阻塞当前线程
    132. * @return 队头元素
    133. * @throws InterruptedException
    134. */
    135. @Override
    136. public E poll() throws InterruptedException {
    137. E e;
    138. int c;
    139. //先加锁
    140. headLock.lock();
    141. try {
    142. //先循环判断队列是否为空
    143. while (isEmpty()){
    144. //如果队列为空,让当前线程阻塞
    145. headWaits.await();
    146. }
    147. //如果队列不为空,可以取了
    148. e = array[head];
    149. array[head] = null; //help GC
    150. //判断head是否越界
    151. if(++head == array.length){
    152. head = 0;
    153. }
    154. //让size-1
    155. c = size.getAndDecrement();
    156. //在这里让poll1唤醒poll2,poll2接着唤醒poll3...
    157. if( c > 1 ){
    158. //c>1说明队列中还有不止一个元素,可以继续唤醒其他poll线程来去元素
    159. headWaits.signal();
    160. }
    161. }finally {
    162. //释放锁
    163. headLock.unlock();
    164. }
    165. /**
    166. * 平级,防止死锁
    167. * 唤醒offer线程,需要加锁
    168. * 当 c == array.length时,说明队列从满变为不满,这时才去
    169. * 给tailLock加锁
    170. */
    171. if( c == array.length ){
    172. tailLock.lock();
    173. try {
    174. tailWaits.signal();
    175. }finally {
    176. tailLock.unlock();
    177. }
    178. }
    179. return e;
    180. }
    181. /**
    182. * 移除队头元素,加入超时时间,如果队空,并且超过等待时间,返回null
    183. * @param timeout 超时时间
    184. * @return 队头元素
    185. * @throws InterruptedException
    186. */
    187. @Override
    188. public E poll(long timeout) throws InterruptedException {
    189. E e;
    190. int c;
    191. //先加锁
    192. headLock.lock();
    193. try {
    194. long t = TimeUnit.MILLISECONDS.toNanos(timeout);
    195. //先循环判断队列是否为空
    196. while (isEmpty()){
    197. //如果队列为空,让当前线程阻塞
    198. if(t <= 0){
    199. return null;
    200. }
    201. t = headWaits.awaitNanos(t);
    202. }
    203. //如果队列不为空,可以取了
    204. e = array[head];
    205. array[head] = null; //help GC
    206. //判断head是否越界
    207. if(++head == array.length){
    208. head = 0;
    209. }
    210. //让size-1
    211. c = size.getAndDecrement();
    212. if( c > 1){
    213. headWaits.signal();
    214. }
    215. }finally {
    216. //释放锁
    217. headLock.unlock();
    218. }
    219. /**
    220. * 平级,防止死锁
    221. * 唤醒offer线程,需要加锁
    222. */
    223. if( c == array.length ){
    224. tailLock.lock();
    225. try {
    226. tailWaits.signal();
    227. }finally {
    228. tailLock.unlock();
    229. }
    230. }
    231. return e;
    232. }
    233. @Override
    234. public boolean isFull() {
    235. return size.get() == array.length;
    236. }
    237. @Override
    238. public boolean isEmpty() {
    239. return size.get() == 0;
    240. }
    241. @Override
    242. public String toString() {
    243. return Arrays.toString(array);
    244. }

    以上就是对阻塞队列的分析了,读者可以在此基础上实现链表的实现等,我们下期再见!

  • 相关阅读:
    LQ0112 立方和【进制】
    Redis分布式锁
    由ASP.NET Core根据路径下载文件异常引发的探究
    SpringBoot读取配置的方式
    Python爬虫入门
    Jenkis 配置钉钉通知
    C语言对单链表所有操作与一些相关面试题
    机器人课程教师面对的困境有哪些
    接口面试题
    自定义一个中间件(功能:解析POST提交到服务器的表单数据)
  • 原文地址:https://blog.csdn.net/jjhnb123/article/details/134340612