• Android——线程和线程池


    AsyncTask

    使用案例可看Android基础——异步消息处理,需要注意

    • AsyncTask必须在主线程中加载,在ActivityThread的main()中会调用AsyncTask的init()
    • AsyncTask对象必须在主线程中创建及调用execute(),且一个对象只能调用一次
    • 不要直接调用onPreExecute()、onPostExecute()、doInBackground()、onProgressUpdate()
    • execute()串行执行,executeOnexecutor()传入THREAD_POOL_EXECUTOR并行执行

    execute()调用executeOnExecutor()传入sDefaultExecutor,将params封装到WorkerRunnable、FutureTask

    public final AsyncTask execute(Params... params) {
        return executeOnExecutor(sDefaultExecutor, params);
    }
    
    public final AsyncTask executeOnExecutor(Executor exec,
            Params... params) {
        ......
        mStatus = Status.RUNNING;
        onPreExecute();
        mWorker.mParams = params;
        exec.execute(mFuture);
        return this;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    sDefaultExecutor是一个串行线程池,调用其execute()

    • 将上面封装的FutureTask插入到mTasks
    • 如果当前没有正在运行的AsyncTask任务,则调用scheduleNext()
    • scheduleNext()将mTasks中的任务取出,在线程池中执行FutureTask的run()
    private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
    public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
    
    private static class SerialExecutor implements Executor {
        final ArrayDeque mTasks = new ArrayDeque();
        Runnable mActive;
        public synchronized void execute(final Runnable r) {
            mTasks.offer(new Runnable() {
                public void run() {
                    try {
                        r.run();
                    } finally {
                        scheduleNext();
                    }
                }
            });
            if (mActive == null) {
                scheduleNext();
            }
        }
        protected synchronized void scheduleNext() {
            if ((mActive = mTasks.poll()) != null) {
                THREAD_POOL_EXECUTOR.execute(mActive);
            }
        }
    }
    
    public static final Executor THREAD_POOL_EXECUTOR;
    static {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                new SynchronousQueue(), sThreadFactory);
        threadPoolExecutor.setRejectedExecutionHandler(sRunOnSerialPolicy);
        THREAD_POOL_EXECUTOR = threadPoolExecutor;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    FutureTask的run()会调用到WorkerRunnable的call()

    • 调用doInBackground()
    • 然后调用postResult(),通过Handler发送MESSAGE_POST_RESULT,这里的Handler实际是InternalHandler
    public AsyncTask() {
        this((Looper) null);
    }
    
    public AsyncTask(@Nullable Handler handler) {
        this(handler != null ? handler.getLooper() : null);
    }
    
    public AsyncTask(@Nullable Looper callbackLooper) {
        mHandler = callbackLooper == null || callbackLooper == Looper.getMainLooper()
            ? getMainHandler()
            : new Handler(callbackLooper);
        mWorker = new WorkerRunnable() {
            public Result call() throws Exception {
                mTaskInvoked.set(true);
                Result result = null;
                try {
                    Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                    result = doInBackground(mParams);
                    Binder.flushPendingCommands();
                } catch (Throwable tr) {
                    mCancelled.set(true);
                    throw tr;
                } finally {
                    postResult(result);
                }
                return result;
            }
        };
        ......
        };
    }
    
    private Result postResult(Result result) {
        Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
                new AsyncTaskResult(this, result));
        message.sendToTarget();
        return result;
    }
    
    private Handler getHandler() {
        return mHandler;
    }
    
    private static Handler getMainHandler() {
        synchronized (AsyncTask.class) {
            if (sHandler == null) {
                sHandler = new InternalHandler(Looper.getMainLooper());
            }
            return sHandler;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    InternalHandler会回调onProgressUpdate() / onCancelled() / onPostExecute,为了能够让执行环境切换到主线程,InternalHandler必须在主线程中创建

    private static class InternalHandler extends Handler {
        public InternalHandler(Looper looper) {
            super(looper);
        }
    
        @Override
        public void handleMessage(Message msg) {
            AsyncTaskResult result = (AsyncTaskResult) msg.obj;
            switch (msg.what) {
                case MESSAGE_POST_RESULT:
                    result.mTask.finish(result.mData[0]);
                    break;
                case MESSAGE_POST_PROGRESS:
                    result.mTask.onProgressUpdate(result.mData);
                    break;
            }
        }
    }
    
    private void finish(Result result) {
        if (isCancelled()) {
            onCancelled(result);
        } else {
            onPostExecute(result);
        }
        mStatus = Status.FINISHED;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    IntentService

    IntentService继承了Service,且是抽象类,用于执行后台耗时任务,执行完后自动停止

    public class LocalIntentService extends IntentService {
        private static final String TAG = "LocalIntentService";
        public LocalIntentService() {
            super(TAG);
        }
        @Override
        protected void onHandleIntent(@Nullable Intent intent) {
            String action = intent.getStringExtra("action");
            Log.d(TAG, "onHandleIntent: receive action = " + action);
            SystemClock.sleep(5000);
            if ("com.demo.demo0".equals(action)) {
                Log.d(TAG, "onHandleIntent: handle action = " + action);
            }
        }
        @Override
        public void onDestroy() {
            super.onDestroy();
            Log.d(TAG, "onDestroy: ");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    如上,在onHandleIntent()中处理intent中传过来的数据,如下通过startService()调用

    public class MainActivity extends AppCompatActivity {
        private static final String TAG = "MainActivity";
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_main);
            Intent service = new Intent(this,LocalIntentService.class);
            service.putExtra("action","com.demo.demo0");
            startService(service);
            service.putExtra("action","com.demo.demo1");
            startService(service);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    打印如下

    onHandleIntent: receive action = com.demo.demo0
    onHandleIntent: handle action = com.demo.demo0
    onHandleIntent: receive action = com.demo.demo1
    onDestroy: 
    
    • 1
    • 2
    • 3
    • 4

    下面分析其原理,onCrete()如下,创建HandlerThread,其调用start(),并用其Looper创建一个ServiceHandler

    @Override
    public void onCreate() {
        super.onCreate();
        HandlerThread thread = new HandlerThread("IntentService[" + mName + "]");
        thread.start();
        mServiceLooper = thread.getLooper();
        mServiceHandler = new ServiceHandler(mServiceLooper);
    }
    
    private volatile ServiceHandler mServiceHandler;
    
    private final class ServiceHandler extends Handler {
        public ServiceHandler(Looper looper) {
            super(looper);
        }
    
        @Override
        public void handleMessage(Message msg) {
            onHandleIntent((Intent)msg.obj);
            stopSelf(msg.arg1);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    HandlerThread继承了Thread,start()会调用其run(),内部创建Looper,当无需使用时,需调用quit() / quitSafely()终止Looper来终止线程

    @Override
    public void run() {
        mTid = Process.myTid();
        Looper.prepare();
        synchronized (this) {
            mLooper = Looper.myLooper();
            notifyAll();
        }
        Process.setThreadPriority(mPriority);
        onLooperPrepared();
        Looper.loop();
        mTid = -1;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    启动服务调用onStartCommand()、onStart(),通过mServiceHandler发送Message,handleMessage()会在HandlerThread中执行(会到创建Handler所在的Looper中执行),回调onHandleIntent()处理耗时任务,处理完后调用stopSelf()停止服务

    @Override
    public int onStartCommand(@Nullable Intent intent, int flags, int startId) {
        onStart(intent, startId);
        return mRedelivery ? START_REDELIVER_INTENT : START_NOT_STICKY;
    }
    
    @Override
    public void onStart(@Nullable Intent intent, int startId) {
        Message msg = mServiceHandler.obtainMessage();
        msg.arg1 = startId;
        msg.obj = intent;
        mServiceHandler.sendMessage(msg);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    线程池

    • 重用线程池中的线程,可以避免创建和销毁带来的性能开销
    • 控制最大并发数,避免线程之间抢占资源导致阻塞
    • 对线程进行管理,提供定时执行、循环执行等功能

    ThreadPoolExecutor

    通过一系列参数配置线程池

    • corePoolSize:核心线程数,当allowsCoreThreadTimeOut=true且核心线程闲置时间超过keepAliveTime就会终止,否则一直存活
    • maximumPoolSize:最大线程数,超过后接下来的任务会被阻塞
    • keepAliveTime:非核心线程闲置时间,超过后非核心线程会被回收,当allowsCoreThreadTimeOut=true同样作用于核心线程
    • unit:指定keepAliveTime的时间单位
    • workQueue:线程池的任务队列,存储execute()提交的Runnable对象
    • threadFactory:线程工厂,为线程池创建新线程
    • handler:当线程池已满或无法成功执行时,调用其中的rejectedExecution()抛出异常
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 当线程数量 < 核心线程数量,启动核心线程执行任务
    • 当线程数量 >= 核心线程数量,任务插入队列等待
    • 当任务队列已满,若未达到最大线程数量,启动非核心线程执行任务,若已达到最大线程数量,则调用rejectedExecution()

    FixedThreadPool

    只有核心线程且数量固定,闲置时不会被回收(可以更快速响应请求),当线程池的线程都在工作时,新任务处于等待状态直到线程空闲出来

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    CachedThreadPool

    无核心线程,最大线程数量无限制,闲置60s会被回收,当线程池中的线程都在工作时,会创建新线程处理新任务(任何任务都会被立即执行,适合执行大量耗时少的任务),无任务时不占用系统资源

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    ScheduledExecutor

    核心线程数量固定,非核心线程数量无限制,闲置10ms后回收,主要用于执行定时任务和具有固定周期的重复任务,schedule()可以延迟运行,scheduleAtFixedRate()可以在延迟后每隔一段时间运行

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    • 1
    • 2
    • 3
    private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
    
     public ScheduledThreadPoolExecutor(int corePoolSize) {
         super(corePoolSize, Integer.MAX_VALUE,
               DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
               new DelayedWorkQueue());
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    SingleThreadExecutor

    只有一个核心线程,确保所有任务都在同一线程中按顺序执行,统一所有任务到一个线程,不需要处理线程同步问题

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue()));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
  • 相关阅读:
    阿里P8熬了一个月肝出这份32W字Java面试手册,在Github标星31K+
    countdownlatch 和 completableFuture 和 CyclicBarrier
    求求你别在用if进行参数校验了!!!
    【AOP系列】AOP场景实践
    java序列化与反序列化
    三、博客首页完成《iVX低代码仿CSDN个人博客制作》
    实例分享:低投资也可实现的企业信息化综合运维管理
    NOIP2023模拟13联测34 competition
    opencv入门笔记(一)
    Matlab自学笔记二十四:字符串的关系运算和比较
  • 原文地址:https://blog.csdn.net/qq_35258036/article/details/132585454