我们对java中定时任务实现可能会有以下疑问:
怎样做到每个任务延迟指定时间执行?
内部使用了什么数据结构保存延迟任务?
延迟任务放入scheduledThreadPool时机并不固定,怎么保证按延迟时间顺序执行?
- public ScheduledThreadPoolExecutor(int corePoolSize) {
- super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
- new DelayedWorkQueue());
- }
corePoolSize就是我们传过了的参数,maximumPoolSize是Integer.MAX_VALUE,所以最大线程是无穷大,非核心线程成活时间是0,所以非核心线程执行完firstTask之后如果poll任务没拿到任务则会直接销毁。queue是DelayedWorkQueue。但通过后面的分析可以知道,最大线程数是不起作用的,最多会起核心线程数的数量
- public ScheduledFuture<?>schedule(Runnable command,
- long delay,
- TimeUnit unit){
- if(command == null || unit == null)
- throw new NullPointerException();
- RunnableScheduledFuture<?> t =decorateTask(command,
- new ScheduledFutureTask<Void>(command, null,
- triggerTime(delay, unit)));
- delayedExecute(t);
- return t;
- }
-
通过decorateTask方法获取到RunnableScheduledFuture(实际上是ScheduledFutureTask对象),并把delay时间变成了时间戳
执行delayedExecute方法
- private voiddelayedExecute(RunnableScheduledFuture> task){
- if(isShutdown())
- reject(task);
- else{
- super.getQueue().add(task);
- if(isShutdown()&&
- !canRunInCurrentRunState(task.isPeriodic())&&
- remove(task))
- task.cancel(false);
- else
- ensurePrestart();
- }
- }
-
使用queue.add方法把task放入queue
执行ensurePrestart方法
- public boolean offer(Runnable x){
- if(x == null)
- throw new NullPointerException();
- RunnableScheduledFuture<?> e =(RunnableScheduledFuture<?>)x;
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- int i = size;
- if(i >= queue.length)
- grow();
- size = i +1;
- if(i ==0){
- queue[0]= e;
- setIndex(e,0);
- }else{
- siftUp(i, e);
- }
- if(queue[0]== e){
- leader = null;
- available.signal();
- }
- } finally {
- lock.unlock();
- }
- return true;
- }
-
DelayedWorkQueue底层使用的是RunnableScheduledFuture的数组,初始化容量是16,之后扩容是以1.5倍进行。
在offer元素整个过程中使用ReentrantLock进行加锁,所以DelayedWorkQueue是一个线程安全的队列。然后使用了condition来实现阻塞的功能,当poll没有元素时会使用await进行等待,当offer的是数组的第一个元素时会signal,这个signal的设计是排序的点睛之笔,设计的非常巧妙,这块需要offer和take方法一起来看,在take方法时会拿第一个元素来判断delay的时间,如果时间没到会使用await休眠delay时间,但此时如果有delay时间更短的任务放入queue中,此时需要take的任务就不是之前的那个任务了,就要重新执行逻辑获取这个最新delay的任务,这样才能做到任务的正确执行。
在offer元素时会使用siftUp方法来保证数组中元素是按delay时间从小到大排列,但要注意的是数组前半部分肯定都是排了delay最小的任务,但后半部分不一定是有序的
- voidensurePrestart(){
- int wc =workerCountOf(ctl.get());
- if(wc < corePoolSize)
- addWorker(null, true);
- elseif(wc ==0)
- addWorker(null, false);
- }
-
这个比较简单,addWorker方法之前我们也分析过了,需要注意的是这里的firstTask默认是空的,所以工作线程会直接从queue中拿任务。这有个比较奇怪的else if,感觉应该永远不用执行,因为wc==0肯定已经被if条件拦截了,也就是只能起核心线程数。最大线程数永远不会起作用
- public RunnableScheduledFuture<?>poll(long timeout, TimeUnit unit)
- throws InterruptedException {
- long nanos = unit.toNanos(timeout);
- final ReentrantLock lock = this.lock;
- 加锁
- lock.lockInterruptibly();
- try {
- 自旋
- for(;;){
- 拿到queue中的第一个元素,如果是空则awaitNanos时间,等待时间过后如果queue中还是没有元素则返回null。
- RunnableScheduledFuture<?> first = queue[0];
- if(first == null){
- if(nanos <=0)
- return null;
- else
- nanos = available.awaitNanos(nanos);
- }else{
- 拿到第一个任务的delay时间,如果到了delay时间则返回finishPoll方法的结果
- long delay = first.getDelay(NANOSECONDS);
- if(delay <=0)
- returnfinishPoll(first);
- 如果传入的nanos小于等于0则返回null
- if(nanos <=0)
- return null;
- first = null;// don't retain ref while waiting
- 如果等待时间还不够或前一个需要执行的任务还在执行,则当前线程直接等待
- if(nanos < delay || leader != null)
- nanos = available.awaitNanos(nanos);
- else{否则当前线程可以执行(leader线程),但需要awaitNanos delay的时间才能执行
- Thread thisThread = Thread.currentThread();
- leader = thisThread;
- try {
- long timeLeft = available.awaitNanos(delay);
- nanos -= delay - timeLeft;
- } finally {
- 当等待时间到了之后就 leader = null说明此时可以返回finishPoll方法的结果
- if(leader == thisThread)
- leader = null;
- }
- }
- }
- }
- } finally {
- if(leader == null && queue[0]!= null)
- available.signal();
- lock.unlock();
- }
- }
DelayedWorkQueue的poll方法也是使用reentrantLock来保证线程安全,然后使用condition.awaitNanos来达到等待特定时间的效果,这里使用leader线程保证了排在第一位的任务只有一个工作线程获取到,其他工作线程进行排队等待,在获取到第一个任务的工作线程delay时间到了之后会take到这个任务并signal排队的第一个工作线程继续获取下一个任务,周而复始。
在使用finishPoll方法返回delay时间到了的任务时会用siftDown对queue后半部分的任务进行排序,因为之前offer时使用siftUp方法只对queue前半部分进行了排序
回到ScheduledThreadPool线程池,keepAliveTime是0,所以当first任务的delay时间还没有到时会直接返回null,然后非核心工作线程就会直接销毁,之后的代码都不会执行,而核心线程则执行的take方法,take方法才会进入下面这段逻辑
- if (leader != null)
- available.await();
- else {
- Thread thisThread = Thread.currentThread();
- leader = thisThread;
- try {
- available.awaitNanos(delay);
- } finally {
- if (leader == thisThread)
- leader = null;
- }
- }
- public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
- long initialDelay,
- long period,
- TimeUnit unit) {
- if (command == null || unit == null)
- throw new NullPointerException();
- if (period <= 0)
- throw new IllegalArgumentException();
- ScheduledFutureTask<Void> sft =
- new ScheduledFutureTask<Void>(command,
- null,
- triggerTime(initialDelay, unit),
- unit.toNanos(period));
- RunnableScheduledFuture<Void> t = decorateTask(command, sft);
- sft.outerTask = t;
- delayedExecute(t);
- return t;
- }
-
这个方法跟之前schedule方法差不多了,都是使用了ScheduledFutureTask,这边多了个period变量保存执行周期值,outerTask引用了自身的对象,然后也是使用delayExecute方法把任务放入了queue中,此时任务的delay是initialDelay,所以会在initialDelay时间之后出队然后执行
由于现在工作线程中的task是ScheduledFutureTask,所以工作线程调用的task.run方法是ScheduledFutureTask.run方法
- public void run() {
- boolean periodic = isPeriodic();
- if (!canRunInCurrentRunState(periodic))
- cancel(false);
- else if (!periodic)
- ScheduledFutureTask.super.run();
- else if (ScheduledFutureTask.super.runAndReset()) {
- setNextRunTime();
- reExecutePeriodic(outerTask);
- }
- }
-
1.判断是不是周期执行的任务,之前的schedule方法的period是0,所以会执行super.run();然后执行传入的runnable中的run方法,而scheduleAtFixedRate方法的period不是0,则会执行super.runAndReset();方法,执行传入的runnable中的run方法之后执行setNextRunTime();
重新设置delay时间(initialDelay+period),然后把任务又放入queue中
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
- long initialDelay,
- long delay,
- TimeUnit unit) {
- if (command == null || unit == null)
- throw new NullPointerException();
- if (delay <= 0)
- throw new IllegalArgumentException();
- ScheduledFutureTask<Void> sft =
- new ScheduledFutureTask<Void>(command,
- null,
- triggerTime(initialDelay, unit),
- unit.toNanos(-delay));
- RunnableScheduledFuture<Void> t = decorateTask(command, sft);
- sft.outerTask = t;
- delayedExecute(t);
- return t;
-
这个方法几乎跟scheduleAtFixedRate方法一模一样,区别在于period是个负数,通过之前我们对scheduleAtFixedRate方法的分析,period这个参数在算周期执行间隔时会用到,也就是setNextRunTime方法
- private void setNextRunTime() {
- long p = period;
- if (p > 0)
- time += p;
- else
- time = triggerTime(-p);
- }
-
当period大于0时,也就是scheduleAtFixedRate执行时,是直接在之前的time加上了period,而scheduleWithFixedDelay方法执行时,是用triggerTime方法在当前时间加上了periode,不同的计算方式的区别在于,scheduleAtFixedRate不会管任务的执行时间,我只要保证任务固定频率执行就好了,所以他是几乎精确的period时间执行,而scheduleWithFixedDelay是在任务之后的时间+period时间来确定下一次任务执行的时间,所以任务执行的频率相对来说不固定