• 3 线程池-扩展


    线程池系列文章

    1 手写线程池
    2 ThreadPoolExecutor分析
    3 线程池扩展

    线程池扩展

    针对线程池的一些常见使用场景和缺陷,提供一些扩展。

    业务名称

    线程池中线程的名字是没有任务业务含义的,因此最好是修改线程名称,出现问题后,方便定位。

    new ThreadPoolExecutor(1, 1, 1, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("bizName_" + t.getName());
                    return t;
                }
            });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    使用自定义线程工程时,注意以下几点:

    1. Thread t = new Thread®;一定要把runnable作为构造参数。
    2. return t; 方法的实现类,默认返回null,请注意一定要修改。
      上述自定义线程池,也可以抽出来,写个NamedThreadFactory。优化如下:
    public class NamedThreadFactory implements ThreadFactory {
        private String bizName;
    
        public NamedThreadFactory(String bizName) {
            this.bizName = bizName;
        }
    
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName(bizName + "_" + t.getName());
            return t;
        }
    }
    
    new ThreadPoolExecutor(1, 1, 1, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(), new NamedThreadFatory("test"));
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    业务异常捕获

    按照线程池的正常逻辑,是无法获取异常信息,那么如何才能获取到异常信息。
    1 业务的Runnable方法自己捕获异常,自己处理,可以很好的自定义。
    2 线程池在每个task执行完成后,会执行afterExecute(task, thrown);方法,因此可以子线程池重写该方法,拿到异常,然后分发。
    方案1的缺陷是,需要手写全局的try catch,但是可以预防因为业务异常导致线程池中的线程退出。
    方案2的缺陷:无法阻止业务异常导致线程退出;需要重写子类;需要处理异常的分发。

    建议在方案1的基础上进行优化,抽象出BizRunnable,由子类继承,子类实现业务内容和异常处理接口,BizRunnable全局try catch。
    示例代码:

    /**采用泛型,解决bizInfo的传递问题*/
    public abstract class BizRunnable<V> implements Runnable {
        /**
         * 业务数据
         */
        private V bizInfo;
    
    
        @Override
        public void run() {
            try {
                bizRun();
            } catch (Exception exception) {
                exceptionHandler(exception, bizInfo);
            }
        }
    
        /**
         * 执行业务功能
         */
        public abstract void bizRun();
    
    
        /**
         * 子类重写,提供异常处理
         *
         * @param exception 异常信息
         * @param v         业务信息
         */
        protected void exceptionHandler(Exception exception, V v) {
            System.out.printf("业务异常:bizInfo:{}", v.toString(), exception);
        }
    
        public V getBizInfo() {
            return bizInfo;
        }
    
        public void setBizInfo(V bizInfo) {
            this.bizInfo = bizInfo;
        }
    }
    
    class MyBizRunnable extends BizRunnable<User> {
    
        public MyBizRunnable(User user) {
            setBizInfo(user);
        }
    
        @Override
        public void bizRun() {
            //TODO user操作
    
        }
    
    
        public void exceptionHandler(Exception e, User user) {
            //TODO 根据异常类型自动处理
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    通过泛型约束BizRunnable的业务内容,对业务数据有较严格的约束,也可以直接对业务数据先序列化,再传递给exceptionHanlder,由exceptionHeanlder进行反序列化。

    动态调整线程数

    ThreadPoolExecutor针对corePoolSize和maximumPoolSize,全部提供set方法,而且两个属性都是volatile,因此可以直接调用setCorePoolSize和setMaximumPoolSize进行调整。

    修改核心线程数:
    1:减少核心线程数,需要尝试唤醒可能因为getTask阻塞的线程,让线程动起来,进行退出校验。
    2. 增大核心线程数,并且队列不为空,那么默认添加核心线程缺少的线程数。

    public void setCorePoolSize(int corePoolSize) {
            if (corePoolSize < 0)
                throw new IllegalArgumentException();
            int delta = corePoolSize - this.corePoolSize;
            this.corePoolSize = corePoolSize;
            if (workerCountOf(ctl.get()) > corePoolSize)
                interruptIdleWorkers();
            else if (delta > 0) {
                // We don't really know how many new threads are "needed".
                // As a heuristic, prestart enough new workers (up to new
                // core size) to handle the current number of tasks in
                // queue, but stop if queue becomes empty while doing so.
                int k = Math.min(delta, workQueue.size());
                while (k-- > 0 && addWorker(null, true)) {
                    if (workQueue.isEmpty())
                        break;
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    public void setMaximumPoolSize(int maximumPoolSize) {
            if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
                throw new IllegalArgumentException();
            this.maximumPoolSize = maximumPoolSize;
            if (workerCountOf(ctl.get()) > maximumPoolSize)
            	//当前线程数大于最大线程数,需要进行线程退出,调用interrupt就是唤醒阻塞在getTask的线程退出。
                interruptIdleWorkers();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    修改线程池的执行顺序

    正常的执行顺序是:核心线程数->入队列->最大线程数,那么是否可以修改相关的顺序,例如:核心线程数->入队列(队列数据超过阈值)->最大线程数(达到最大线程数)->再入队列。
    主要是通过修改任务队列实现,

    public class MyBlockingQueue<E> extends ArrayBlockingQueue<E> {
        private ThreadPoolExecutor executor;
        private Integer threshold;
        private ReentrantLock lock = new ReentrantLock();
    
    
        public MyBlockingQueue(int capacity, int threshold, ThreadPoolExecutor executor) {
            super(capacity);
            this.executor = executor;
            this.threshold = threshold;
        }
    
        /**
         * 重写offer方法,如果队列长度大于阈值
         */
        public boolean offer(E e) {
        	if(executor == null){
    			return  super.offer(e);
    		}
            if (this.size() >= threshold) {
                //队列中的任务已经超过阈值
                if (executor.getPoolSize() < executor.getMaximumPoolSize()) {
                    //当前线程数小于最大线程数,创建线程
                    return false;
                }
            }
            return super.offer(e);
        }
    
    }
    
    
    //创建使用
    public class MaxThreadFirstThreadPool extends MyThreadPool {
    
        public MaxThreadFirstThreadPool(Integer coreThreadSize, Integer maxThreadSize, Integer threadWaitingTime, BlockingQueue queue) {
            super(coreThreadSize, maxThreadSize, threadWaitingTime);
            if (queue instanceof MyBlockingQueue) {
                ((MyBlockingQueue) queue).setExecutor();
            }
        }
    }
    之所以在构造函数中设置queue的executor,是为了防止可能出现的空指针。当然也可以后续设置,代码中进行判断。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    tomcat就是修改了线程池的执行顺序,执行顺序:核心线程数->最大线程数->入队列->决绝策略。

    任务执行超时

    针对线程池配置监控阈值,当任务执行时间超过阈值,进行告警。

    ThreadLocal

    利用ThreadLocal,在线程池的beforeExecute和afterExecute方法之间进行比对。

    public class ExecuteTimed {
        private static ThreadLocal<Long> timedTL = new ThreadLocal<>();
    
        public static void set() {
            timedTL.set(System.currentTimeMillis());
        }
    
        public static Long get() {
            return timedTL.get();
        }
    
        public static Long remove() {
            Long start = timedTL.get();
            timedTL.remove();
            return start;
        }
    }
    
    public class ExecuteTimedThreadPool extends ThreadPoolExecutor {
        //是否开启执行时间监控,默认=true
        private volatile boolean openExecuteTime = true;
    
        public ExecuteTimedThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        }
    
        public void openExecuteTime() {
            openExecuteTime = true;
        }
    
        public void closeExecuteTime() {
            openExecuteTime = false;
        }
    
        /**
         * 任务执行前
         */
        public void beforeExecute(Thread t, Runnable r) {
            if (openExecuteTime) {
                ExecuteTimed.set();
            }
        }
    
        /**
         * 任务执行后
         */
        public void afterExecute(Runnable r, Throwable t) {
            if (openExecuteTime) {
                Long start = ExecuteTimed.remove();
                if (start != null) {
                    if ((System.currentTimeMillis() - start) > 3000) {
                        //执行时间超过3秒,开始告警,注意如果线程名称中带有业务信息,
                        //可以提取出来
                        System.out.println("任务执行超过30秒");
                    }
                }
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    Map记录

    类似于ThreadLocal,key=Thread,value=开始时间

    任务等待超时

    等待超时,从任务提交,到任务beforeExecute的时间差额。但是任务提交是业务线程,beforeExecutor是线程池线程,使用ThreadLocal不太合适,因此需要对Task进行包装。

    public class WaitingTimeTask implements Runnable {
        //实际任务
        private Runnable task;
        //提交时间
        private Long submitTime;
    
        public WaitingTimeTask(Runnable runnable) {
            this.task = runnable;
            this.submitTime = System.currentTimeMillis();
        }
    
        public Long getSubmitTime() {
            return getSubmitTime();
        }
    
        @Override
        public void run() {
            task.run();
        }
    }
    
    
    public class WaitingTimedThreadPool extends ThreadPoolExecutor {
    
        public WaitingTimedThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
    
        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            super.submit(new WaitingTimeTask(command));
        }
    
        /**
         * 任务执行后
         */
        public void afterExecute(Runnable r, Throwable t) {
            if (r instanceof WaitingTimeTask) {
                Long timed = System.currentTimeMillis() - ((WaitingTimeTask) r).getSubmitTime();
                if (timed > 3000) {
                    //超过等待阈值,告警。
                    System.out.println("等待超时");
                }
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    包装Task,添加提交时间,注意:这是事后告警。其实:任务执行超时的开始时间,也可以包装到WaitingTimeTask任务中。

    再拯救一次

    线程池的执行顺序是:核心线程数->入队列->对打线程数->拒绝策略,如果明白线程池的相关业务是CPU密集型的或者很快就可以执行完成的,那么可以再拯救一次,重写拒绝策略。

    public class RetryThreadPool extends ThreadPoolExecutor {
        private RejectedExecutionHandler rejectHandler;
    
        public RetryThreadPool(int corePoolSize,
                               int maximumPoolSize,
                               long keepAliveTime,
                               TimeUnit unit,
                               BlockingQueue<Runnable> workQueue,
                               RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
            //修改拒绝策略
            this.setRejectedExecutionHandler(new RetryRejectHandler(handler));
        }
    }
    
    class RetryRejectHandler implements RejectedExecutionHandler {
        private RejectedExecutionHandler handler;
    
        public RetryRejectHandler(RejectedExecutionHandler handler) {
            this.handler = handler;
        }
    
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            if (!executor.getQueue().offer(r)) {
                //再次入队列失败,拯救不活了,执行原来的拒绝策略
                handler.rejectedExecution(r, executor);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    线程池可视化操作

    项目启动后,是无法感知线程池的相关情况,例如:当前活动线程数、修改核心线程数、修改最大线程数、已经处理的任务、剩余任务数、异常任务数等。如果可以有个页面能查看相关信息,那么针对线程的情况,可以进行动态调整参数。

    /**
     * @author jkf
     * @date 2022-09-05 15:40:00
     * @description
     */
    public class VisualizationThreadPool extends ThreadPoolExecutor {
        /**
         * 线程池的名称
         */
        private String name;
        /**
         * 线程池的描述
         */
        private String desc;
    
        private volatile boolean started = false;
    
        public VisualizationThreadPool(String name, String desc, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
            try {
                ThreadPoolVisualizationRegistry.registry(this);
                started = true;
            } catch (IllegalArgumentException createException) {
                //存在重名的线程池,关闭创建的线程池
                this.shutdown();
            }
        }
    
    
        /**
         * 重写terminated,主要用于线程池关闭时,移除注册
         */
        protected void terminated() {
            if (started) {
                ThreadPoolVisualizationRegistry.remove(this);
            }
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public String getDesc() {
            return desc;
        }
        public void setDesc(String desc) {
            this.desc = desc;
        }
    
        public boolean equals(Object obj) {
            if (!(obj instanceof VisualizationThreadPool)) {
                return false;
            }
            return Objects.equals(name, ((VisualizationThreadPool) obj).getName());
        }
    
        public int hashCode() {
            if (StringUtils.isEmpty(name)) {
                return -1;
            }
            return name.hashCode();
        }
    }
    
    public class ThreadPoolVisualizationRegistry {
        private static Map<String, VisualizationThreadPool> threadPoolMap = new ConcurrentHashMap<>();
    
        public static ExecutorService getThreadPool(String name) {
            return null;
        }
    
        public synchronized static void registry(VisualizationThreadPool threadPool) {
            if (threadPoolMap.containsKey(threadPool.getName())) {
                throw new IllegalStateException("可视化threadPool[" + threadPool.getName() + "]已经存在");
            }
            threadPoolMap.put(threadPool.getName(), threadPool);
        }
    
        public synchronized static void remove(VisualizationThreadPool threadPool) {
            threadPoolMap.remove(threadPool);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84

    通过ThreadPoolVisualizationRegistry已经能够获取到相关的ThreadPoolExecutor,剩余的只需要通过Restful接口或者dubbo进行服务暴露ThreadPoolVisualizationRegistry中的数据。可视化管理服务只需要从注册中心调用相关的暴露API,即可实现页面可视化操作。

  • 相关阅读:
    Spring事务-底层原理分析
    关于Win系统提示由于找不到msvcr120.dll文件问题解决办法
    第5章 - 二阶多智能体系统的协同控制 --> 连续时间系统编队控制
    Javascript 教程
    RunnerGo UI自动化测试脚本如何配置
    Hive创建分区表并插入数据
    MyBatis Plus复合主键问题解析
    一个.NET内置依赖注入的小型强化版
    微服务讲解
    错误边界(Error boundary)
  • 原文地址:https://blog.csdn.net/u010652576/article/details/126715467