• scheduledThreadPool


    我们对java中定时任务实现可能会有以下疑问:

    1. 怎样做到每个任务延迟指定时间执行?

    2. 内部使用了什么数据结构保存延迟任务?

    3. 延迟任务放入scheduledThreadPool时机并不固定,怎么保证按延迟时间顺序执行?

    构造器

    1. public ScheduledThreadPoolExecutor(int corePoolSize) {
    2. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
    3. new DelayedWorkQueue());
    4. }

    corePoolSize就是我们传过了的参数,maximumPoolSize是Integer.MAX_VALUE,所以最大线程是无穷大,非核心线程成活时间是0,所以非核心线程执行完firstTask之后如果poll任务没拿到任务则会直接销毁。queue是DelayedWorkQueue。但通过后面的分析可以知道,最大线程数是不起作用的,最多会起核心线程数的数量

    schedule(Runnable command,long delay, TimeUnit unit)方法

    1. public ScheduledFuture<?>schedule(Runnable command,
    2. long delay,
    3. TimeUnit unit){
    4. if(command == null || unit == null)
    5. throw new NullPointerException();
    6. RunnableScheduledFuture<?> t =decorateTask(command,
    7. new ScheduledFutureTask<Void>(command, null,
    8. triggerTime(delay, unit)));
    9. delayedExecute(t);
    10. return t;
    11. }

    1. 通过decorateTask方法获取到RunnableScheduledFuture(实际上是ScheduledFutureTask对象),并把delay时间变成了时间戳

    2. 执行delayedExecute方法

    delayedExecute方法

    1. private voiddelayedExecute(RunnableScheduledFuture task){
    2. if(isShutdown())
    3. reject(task);
    4. else{
    5. super.getQueue().add(task);
    6. if(isShutdown()&&
    7. !canRunInCurrentRunState(task.isPeriodic())&&
    8. remove(task))
    9. task.cancel(false);
    10. else
    11. ensurePrestart();
    12. }
    13. }

    1. 使用queue.add方法把task放入queue

    2. 执行ensurePrestart方法

    offer方法

    1. public boolean offer(Runnable x){
    2. if(x == null)
    3. throw new NullPointerException();
    4. RunnableScheduledFuture<?> e =(RunnableScheduledFuture<?>)x;
    5. final ReentrantLock lock = this.lock;
    6. lock.lock();
    7. try {
    8. int i = size;
    9. if(i >= queue.length)
    10. grow();
    11. size = i +1;
    12. if(i ==0){
    13. queue[0]= e;
    14. setIndex(e,0);
    15. }else{
    16. siftUp(i, e);
    17. }
    18. if(queue[0]== e){
    19. leader = null;
    20. available.signal();
    21. }
    22. } finally {
    23. lock.unlock();
    24. }
    25. return true;
    26. }

    1. DelayedWorkQueue底层使用的是RunnableScheduledFuture的数组,初始化容量是16,之后扩容是以1.5倍进行。

    2. 在offer元素整个过程中使用ReentrantLock进行加锁,所以DelayedWorkQueue是一个线程安全的队列。然后使用了condition来实现阻塞的功能,当poll没有元素时会使用await进行等待,当offer的是数组的第一个元素时会signal,这个signal的设计是排序的点睛之笔,设计的非常巧妙,这块需要offer和take方法一起来看,在take方法时会拿第一个元素来判断delay的时间,如果时间没到会使用await休眠delay时间,但此时如果有delay时间更短的任务放入queue中,此时需要take的任务就不是之前的那个任务了,就要重新执行逻辑获取这个最新delay的任务,这样才能做到任务的正确执行。

    3. 在offer元素时会使用siftUp方法来保证数组中元素是按delay时间从小到大排列,但要注意的是数组前半部分肯定都是排了delay最小的任务,但后半部分不一定是有序的

    ensurePrestart()方法

    1. voidensurePrestart(){
    2. int wc =workerCountOf(ctl.get());
    3. if(wc < corePoolSize)
    4. addWorker(null, true);
    5. elseif(wc ==0)
    6. addWorker(null, false);
    7. }

    这个比较简单,addWorker方法之前我们也分析过了,需要注意的是这里的firstTask默认是空的,所以工作线程会直接从queue中拿任务。这有个比较奇怪的else if,感觉应该永远不用执行,因为wc==0肯定已经被if条件拦截了,也就是只能起核心线程数。最大线程数永远不会起作用

    poll方法

    1. public RunnableScheduledFuture<?>poll(long timeout, TimeUnit unit)
    2. throws InterruptedException {
    3. long nanos = unit.toNanos(timeout);
    4. final ReentrantLock lock = this.lock;
    5. 加锁
    6. lock.lockInterruptibly();
    7. try {
    8. 自旋
    9. for(;;){
    10. 拿到queue中的第一个元素,如果是空则awaitNanos时间,等待时间过后如果queue中还是没有元素则返回null
    11. RunnableScheduledFuture<?> first = queue[0];
    12. if(first == null){
    13. if(nanos <=0)
    14. return null;
    15. else
    16. nanos = available.awaitNanos(nanos);
    17. }else{
    18. 拿到第一个任务的delay时间,如果到了delay时间则返回finishPoll方法的结果
    19. long delay = first.getDelay(NANOSECONDS);
    20. if(delay <=0)
    21. returnfinishPoll(first);
    22. 如果传入的nanos小于等于0则返回null
    23. if(nanos <=0)
    24. return null;
    25. first = null;// don't retain ref while waiting
    26. 如果等待时间还不够或前一个需要执行的任务还在执行,则当前线程直接等待
    27. if(nanos < delay || leader != null)
    28. nanos = available.awaitNanos(nanos);
    29. else{否则当前线程可以执行(leader线程),但需要awaitNanos delay的时间才能执行
    30. Thread thisThread = Thread.currentThread();
    31. leader = thisThread;
    32. try {
    33. long timeLeft = available.awaitNanos(delay);
    34. nanos -= delay - timeLeft;
    35. } finally {
    36. 当等待时间到了之后就 leader = null说明此时可以返回finishPoll方法的结果
    37. if(leader == thisThread)
    38. leader = null;
    39. }
    40. }
    41. }
    42. }
    43. } finally {
    44. if(leader == null && queue[0]!= null)
    45. available.signal();
    46. lock.unlock();
    47. }
    48. }

    1. DelayedWorkQueue的poll方法也是使用reentrantLock来保证线程安全,然后使用condition.awaitNanos来达到等待特定时间的效果,这里使用leader线程保证了排在第一位的任务只有一个工作线程获取到,其他工作线程进行排队等待,在获取到第一个任务的工作线程delay时间到了之后会take到这个任务并signal排队的第一个工作线程继续获取下一个任务,周而复始。

    2. 在使用finishPoll方法返回delay时间到了的任务时会用siftDown对queue后半部分的任务进行排序,因为之前offer时使用siftUp方法只对queue前半部分进行了排序

    3. 回到ScheduledThreadPool线程池,keepAliveTime是0,所以当first任务的delay时间还没有到时会直接返回null,然后非核心工作线程就会直接销毁,之后的代码都不会执行,而核心线程则执行的take方法,take方法才会进入下面这段逻辑

    1. if (leader != null)
    2. available.await();
    3. else {
    4. Thread thisThread = Thread.currentThread();
    5. leader = thisThread;
    6. try {
    7. available.awaitNanos(delay);
    8. } finally {
    9. if (leader == thisThread)
    10. leader = null;
    11. }
    12. }

    scheduleAtFixedRate方法

    1. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
    2. long initialDelay,
    3. long period,
    4. TimeUnit unit) {
    5. if (command == null || unit == null)
    6. throw new NullPointerException();
    7. if (period <= 0)
    8. throw new IllegalArgumentException();
    9. ScheduledFutureTask<Void> sft =
    10. new ScheduledFutureTask<Void>(command,
    11. null,
    12. triggerTime(initialDelay, unit),
    13. unit.toNanos(period));
    14. RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    15. sft.outerTask = t;
    16. delayedExecute(t);
    17. return t;
    18. }

    1. 这个方法跟之前schedule方法差不多了,都是使用了ScheduledFutureTask,这边多了个period变量保存执行周期值,outerTask引用了自身的对象,然后也是使用delayExecute方法把任务放入了queue中,此时任务的delay是initialDelay,所以会在initialDelay时间之后出队然后执行

    2. 由于现在工作线程中的task是ScheduledFutureTask,所以工作线程调用的task.run方法是ScheduledFutureTask.run方法

    ScheduledFutureTask.run方法

    1. public void run() {
    2. boolean periodic = isPeriodic();
    3. if (!canRunInCurrentRunState(periodic))
    4. cancel(false);
    5. else if (!periodic)
    6. ScheduledFutureTask.super.run();
    7. else if (ScheduledFutureTask.super.runAndReset()) {
    8. setNextRunTime();
    9. reExecutePeriodic(outerTask);
    10. }
    11. }

    1.判断是不是周期执行的任务,之前的schedule方法的period是0,所以会执行super.run();然后执行传入的runnable中的run方法,而scheduleAtFixedRate方法的period不是0,则会执行super.runAndReset();方法,执行传入的runnable中的run方法之后执行setNextRunTime();

    重新设置delay时间(initialDelay+period),然后把任务又放入queue中

    scheduleWithFixedDelay方法

    1. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
    2. long initialDelay,
    3. long delay,
    4. TimeUnit unit) {
    5. if (command == null || unit == null)
    6. throw new NullPointerException();
    7. if (delay <= 0)
    8. throw new IllegalArgumentException();
    9. ScheduledFutureTask<Void> sft =
    10. new ScheduledFutureTask<Void>(command,
    11. null,
    12. triggerTime(initialDelay, unit),
    13. unit.toNanos(-delay));
    14. RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    15. sft.outerTask = t;
    16. delayedExecute(t);
    17. return t;

    这个方法几乎跟scheduleAtFixedRate方法一模一样,区别在于period是个负数,通过之前我们对scheduleAtFixedRate方法的分析,period这个参数在算周期执行间隔时会用到,也就是setNextRunTime方法

    setNextRunTime方法

    1. private void setNextRunTime() {
    2. long p = period;
    3. if (p > 0)
    4. time += p;
    5. else
    6. time = triggerTime(-p);
    7. }

    当period大于0时,也就是scheduleAtFixedRate执行时,是直接在之前的time加上了period,而scheduleWithFixedDelay方法执行时,是用triggerTime方法在当前时间加上了periode,不同的计算方式的区别在于,scheduleAtFixedRate不会管任务的执行时间,我只要保证任务固定频率执行就好了,所以他是几乎精确的period时间执行,而scheduleWithFixedDelay是在任务之后的时间+period时间来确定下一次任务执行的时间,所以任务执行的频率相对来说不固定

    公众号同名,欢迎关注

  • 相关阅读:
    互联网行业中产品经理常用的专业术语有哪些?
    进阶三部曲第一部《Android进阶之光》第2版已出版
    求回复!供热建模方面
    【leetcode每日一题】【滑动窗口长度固定】案例
    『忘了再学』Shell基础 — 31、字符处理相关命令
    【云原生 • DevOps】influxDB、cAdvisor、Grafana 工具使用详解
    JavaWeb项目学习(一)
    树上查询SPOJQTREE4
    elementPlus Pagination 分页怎样变中文
    全连接=可编程!玻色量子成功研制光量子测控一体机——“量枢”
  • 原文地址:https://blog.csdn.net/shidebin/article/details/126819011