• 案例复现,带你分析Priority Blocking Queue比较器异常导致的NPE问题


    摘要:本文通过完整的案例复现来演示在什么情况会触发该问题,同时给出了处理建议。希望读者在编程时加以借鉴,避免再次遇到此类问题。

    本文分享自华为云社区《Priority Blocking Queue比较器异常导致的NPE问题分析》,作者:谢照昆、王嘉伟。

    编者按:笔者在使用PriorityBlockingQueue实现按照优先级处理任务时遇到一类NPE问题,经过分析发现根本原因是在任务出队列时调用比较器异常,进而导致后续任务出队列抛出NullPointerException。本文通过完整的案例复现来演示在什么情况会触发该问题,同时给出了处理建议。希望读者在编程时加以借鉴,避免再次遇到此类问题。

    背景知识

    PriorityBlockingQueue是一个无界的基于数组的优先级阻塞队列,使用一个全局ReentrantLock来控制某一时刻只有一个线程可以进行元素出队和入队操作,并且每次出队都返回优先级别最高的或者最低的元素。PriorityBlockingQueue通过以下两种方式实现元素优先级排序:

    1. 入队元素实现Comparable接口来比较元素优先级;
    2. PriorityBlockingQueue构造函数指定Comparator来比较元素优先级;

    关于PriorityBlockingQueue中队列操作的部分,基本和PriorityQueue逻辑一致,只不过在操作时加锁了。在本文中我们主要关注PriorityBlockingQueue出队的take方法,该方法通过调用dequeue方法将元素出队列。当没有元素可以出队的时候,线程就会阻塞等待。

    1. public E take() throws InterruptedException {
    2. final ReentrantLock lock = this.lock;
    3. lock.lockInterruptibly();
    4. E result;
    5. try {
    6. // 尝试获取最小元素,即小顶堆第一个元素,然后重新排序,如果不存在表示队列暂无元素,进行阻塞等待。
    7. while ( (result = dequeue()) == null)
    8. notEmpty.await();
    9. } finally {
    10. lock.unlock();
    11. }
    12. return result;
    13. }

    现象

    在某个业务服务中使用PriorityBlockingQueue实现按照优先级处理任务,某一天环境中的服务突然间不处理任务了,查看后台日志,发现一直抛出NullPointerException。将进程堆dump出来,使用MAT发现某个PriorityBlockingQueue中的size值比实际元素个数多1个(入队时已经对任务进行非空校验)。

    异常堆栈如下:

    1. java.lang.NullPointerException
    2. at java.util.concurrent.PriorityBlockingQueue.siftDownComparable(PriorityBlockingQueue.java:404)
    3. at java.util.concurrent.PriorityBlockingQueue.dequeue(PriorityBlockingQueue.java:333)
    4. at java.util.concurrent.PriorityBlockingQueue.take(PriorityBlockingQueue.java:548)
    5. ...

    MAT结果:

    原因分析

    在此我们分析下PriorityBlockingQueue是如何出队列的,PriorityBlockingQueue最终通过调用dequeue方法出队列,dequeue方法处理逻辑如下:

    1. 将根节点(array[0])赋值给result;
    2. array[n] 赋值给 arrary[0];
    3. 将 array[n] 设置为 null;
    4. 调用siftDownComparable或siftDownUsingComparator对队列元素重新排序;
    5. size大小减1;
    6. 返回result;

    如果在第4步中出现异常,就会出现队列中的元素个数比实际的元素个数多1个的现象。此时size未发生改变,arry[n]已经被置为null,再进行siftDown操作时就会抛出NullPointerException。继续分析第4步中在什么情况下会出现异常,通过代码走读我们可以发现只有在调用Comparable#compareTo或者Comparator#compare方法进行元素比较的时候才可能出现异常。这块代码的处理逻辑和业务相关,如果业务代码处理不当抛出异常,就会导致上述现象。

    1. /**
    2. * Mechanics for poll(). Call only while holding lock.
    3. */
    4. private E dequeue() {
    5. int n = size - 1;
    6. if (n < 0)
    7. return null;
    8. else {
    9. Object[] array = queue;
    10. E result = (E) array[0]; //step1
    11. E x = (E) array[n]; //step2
    12. array[n] = null; //step3
    13. Comparator super E> cmp = comparator;
    14. if (cmp == null) //step4 如果指定了comparator,就按照指定的comparator来比较。否则就按照默认的
    15. siftDownComparable(0, x, array, n);
    16. else
    17. siftDownUsingComparator(0, x, array, n, cmp);
    18. size = n; //step5
    19. return result; //step6
    20. }
    21. }
    22. private static void siftDownComparable(int k, T x, Object[] array, int n) {
    23. if (n > 0) {
    24. Comparable super T> key = (Comparable super T>)x;
    25. int half = n >>> 1;
    26. while (k < half) {
    27. int child = (k << 1) + 1;
    28. Object c = array[child];
    29. int right = child + 1;
    30. if (right < n && ((Comparable super T>) c).compareTo((T) array[right]) > 0)
    31. c = array[child = right];
    32. if (key.compareTo((T) c) <= 0)
    33. break;
    34. array[k] = c;
    35. k = child;
    36. }
    37. array[k] = key;
    38. }
    39. }
    40. private static void siftDownUsingComparator(int k, T x, Object[] array, int n,
    41. Comparator super T> cmp) {
    42. if (n > 0) {
    43. int half = n >>> 1;
    44. while (k < half) {
    45. int child = (k << 1) + 1;
    46. Object c = array[child];
    47. int right = child + 1;
    48. if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
    49. c = array[child = right];
    50. if (cmp.compare(x, (T) c) <= 0)
    51. break;
    52. array[k] = c;
    53. k = child;
    54. }
    55. array[k] = x;
    56. }
    57. }

    复现代码

    1. import java.util.ArrayList;
    2. import java.util.List;
    3. import java.util.concurrent.PriorityBlockingQueue;
    4. public class PriorityBlockingQueueTest {
    5. static class Entity implements Comparable {
    6. private int id;
    7. private String name;
    8. private boolean flag;
    9. public void setFlag(boolean flag) {
    10. this.flag = flag;
    11. }
    12. public Entity(int id, String name) {
    13. this.id = id;
    14. this.name = name;
    15. }
    16. @Override
    17. public int compareTo(Entity entity) {
    18. if(flag) {
    19. throw new RuntimeException("Test Exception");
    20. }
    21. if (entity == null || this.id > entity.id) {
    22. return 1;
    23. }
    24. return this.id == entity.id ? 0 : -1;
    25. }
    26. }
    27. public static void main(String[] args) {
    28. int num = 5;
    29. PriorityBlockingQueue priorityBlockingQueue = new PriorityBlockingQueue<>();
    30. List entities = new ArrayList<>();
    31. for (int i = 0; i < num; i++) {
    32. Entity entity = new Entity(i, "entity" + i);
    33. entities.add(entity);
    34. priorityBlockingQueue.offer(entity);
    35. }
    36. entities.get(num - 1).setFlag(true);
    37. int size = entities.size();
    38. for (int i = 0; i < size; i++) {
    39. try {
    40. priorityBlockingQueue.take();
    41. } catch (Exception e) {
    42. e.printStackTrace();
    43. }
    44. }
    45. }

    执行结果如下:

    1. java.lang.RuntimeException: Test Exception
    2. at PriorityBlockingQueueTest$Entity.compareTo(PriorityBlockingQueueTest.java:31)
    3. at PriorityBlockingQueueTest$Entity.compareTo(PriorityBlockingQueueTest.java:8)
    4. at java.util.concurrent.PriorityBlockingQueue.siftDownComparable(PriorityBlockingQueue.java:404)
    5. at java.util.concurrent.PriorityBlockingQueue.dequeue(PriorityBlockingQueue.java:333)
    6. at java.util.concurrent.PriorityBlockingQueue.take(PriorityBlockingQueue.java:548)
    7. at PriorityBlockingQueueTest.main(PriorityBlockingQueueTest.java:71)
    8. java.lang.NullPointerException
    9. at java.util.concurrent.PriorityBlockingQueue.siftDownComparable(PriorityBlockingQueue.java:404)
    10. at java.util.concurrent.PriorityBlockingQueue.dequeue(PriorityBlockingQueue.java:333)
    11. at java.util.concurrent.PriorityBlockingQueue.take(PriorityBlockingQueue.java:548)
    12. at PriorityBlockingQueueTest.main(PriorityBlockingQueueTest.java:71)

    规避方案

    可以通过以下两种方法规避:

    • 在take方法出现NPE时,清除队列元素,将未处理的元素重新进入队列;
    • 在 Comparable#compareTo 或 Comparator#compare 方法中做好异常处理,对异常情况进行默认操作;

    建议使用后者。

    案例引申

    使用PriorityBlockingQueue作为缓存队列来创建线程池时,使用submit提交任务会出现 java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to 异常,而使用execute没有问题。

    观察submit源码可以发现在submit内部代码会将Runable封装成RunnableFuture对象,然后调用execute提交任务。

    1. public Future> submit(Runnable task) {
    2. if (task == null) throw new NullPointerException();
    3. RunnableFuture<Void> ftask = newTaskFor(task, null);
    4. execute(ftask);
    5. return ftask;
    6. }

    以Comparable为例,任务入队列时,最终会调用siftUpComparable方法。该方法第一步将RunnableFuture强转为Comparable类型,而RunnableFuture类未实现Comparable接口,进而抛出ClassCastException异常。

    1. public boolean offer(E e) {
    2. if (e == null)
    3. throw new NullPointerException();
    4. final ReentrantLock lock = this.lock;
    5. lock.lock();
    6. int n, cap;
    7. Object[] array;
    8. while ((n = size) >= (cap = (array = queue).length))
    9. tryGrow(array, cap);
    10. try {
    11. Comparator super E> cmp = comparator;
    12. if (cmp == null)
    13. siftUpComparable(n, e, array);
    14. else
    15. siftUpUsingComparator(n, e, array, cmp);
    16. size = n + 1;
    17. notEmpty.signal();
    18. } finally {
    19. lock.unlock();
    20. }
    21. return true;
    22. }
    23. private static void siftUpComparable(int k, T x, Object[] array) {
    24. Comparable super T> key = (Comparable super T>) x;
    25. while (k > 0) {
    26. int parent = (k - 1) >>> 1;
    27. Object e = array[parent];
    28. if (key.compareTo((T) e) >= 0)
    29. break;
    30. array[k] = e;
    31. k = parent;
    32. }
    33. array[k] = key;
    34. }

    这也是常见的比较器调用异常案例,本文不再赘述,可自行参考其他文章。

    总结

    在使用PriorityBlockingQueue时,注意在比较器中做好异常处理,避免出现类似问题。

    后记

    如果遇到相关技术问题(包括不限于毕昇 JDK),可以进入毕昇 JDK 社区查找相关资源(点击阅读原文进入官网),包括二进制下载、代码仓库、使用教学、安装、学习资料等。

     点击关注,第一时间了解华为云新鲜技术~

  • 相关阅读:
    ABAP BOM按层级删除数据
    redis的启动方式
    使用MATLAB进行傅里叶变换
    Redis数据类型之Stream系列一
    金仓数据库KingbaseES客户端编程接口指南-Nodejs(3. Nodejs驱动使用说明)
    ns-3-model-library wifi 浅析_ns-3wifi部分解析_ns-3网络模拟器wifi部分文档分析_Part3
    docker engine stopped
    从入门到精通:Java三目运算符详细教程!
    米尔AM62x核心板,高配价低,AM335x升级首选
    Flume最简单使用
  • 原文地址:https://blog.csdn.net/devcloud/article/details/126282112