• Java 进阶多线程(二)


    文章目录

    一、线程通信(了解)

    • 概念

    线程通信就是线程间相互发送数据,线程间共享一个资源即可实现线程通信

    • 常见方式

    通过共享一个数据的方式实现

    根据共享数据的情况决定自己该怎么做,以及通知其他线程怎么做

    • 场景

    生产者与消费者模型:生产者线程负责生产数据,消费者线程负责消费生产者产生的数据

    • 要求

    生产者线程生产完数据后唤醒消费者,然后等待自己,消费者消费完该数据后唤醒生产者,然后等待自己

    方法名解释
    void wait​()当前线程等待,直到另一个线程调用notify() 或 notifyAll()唤醒自己
    void notify()唤醒正在等待对象监视器(锁对象)的单个线程
    void notifyAll()唤醒正在等待对象监视器(锁对象)的所有线程

    二、线程池(重点)

    • 概述

    可以复用线程的技术

    1、获得线程池对象

    ExecutorService实现类ThreadPoolExecutor自创建一个线程池对象

    用Executors(线程池的工具类)调用方法返回不同特点的线程池对象

    1. public ThreadPoolExecutor(
    2. int corePoolSize,//核心线程数
    3. int maximumPoolSize,//最大线程数
    4. long keepAliveTime,//线程空闲时间
    5. TimeUnit unit,//时间单位
    6. BlockingQueue<Runnable> workQueue,//任务队列
    7. ThreadFactory threadFactory,//线程工厂
    8. RejectedExecutionHandler handler//拒绝策略
    9. )
    参数名解释
    corePoolSize指定线程池的线程数量(核心线程),不能小于0
    maximumPoolSize指定线程池可支持的最大线程数,最大数量 >= 核心线程数量
    keepAliveTime指定临时线程的最大存活时间 ,不能小于0
    unit指定存活时间的单位(秒、分、时、天)
    workQueue指定任务队列 ,不能为null
    threadFactory指定用哪个线程工厂创建线程,不能为null
    handler指定线程忙,任务满的时候,新任务来了怎么办,不能为null
    1. TimeUnit.DAYS; //
    2. TimeUnit.HOURS; //小时
    3. TimeUnit.MINUTES; //分钟
    4. TimeUnit.SECONDS; //
    5. TimeUnit.MILLISECONDS; //毫秒
    6. TimeUnit.MICROSECONDS; //微妙
    7. TimeUnit.NANOSECONDS; //纳秒

     

    2、线程池处理Runnable

    ExecutorService方法解释
    void execute(Runnable command)执行任务/命令,没有返回值,一般用来执行 Runnable 任务
    Future submit(Callable task)执行任务,返回未来任务对象获取线程结果,一般拿来执行 Callable 任务
    void shutdown()等任务执行完毕后关闭线程池
    List shutdownNow()立刻关闭,停止正在执行的任务,并返回队列中未执行的任务
    4种策略解释
    ThreadPoolExecutor.AbortPolicy丢弃任务并抛出RejectedExecutionException异常。是默认的策略
    ThreadPoolExecutor.DiscardPolicy丢弃任务,但是不抛出异常 这是不推荐的做法
    ThreadPoolExecutor.DiscardOldestPolicy抛弃队列中等待最久的任务 然后把当前任务加入队列中
    ThreadPoolExecutor.CallerRunsPolicy由主线程负责调用任务的run()方法从而绕过线程池直接执行

    1)AbortPolicy实战

    当运行任务数超过核心数时,会报RejectedExecutionException错误

    1. import java.util.concurrent.*;
    2. public class ThreadPoolExecutorTest implements Runnable {
    3. private String name;
    4. public ThreadPoolExecutorTest(String name) {
    5. this.name = name;
    6. }
    7. @Override
    8. public void run() {
    9. try {
    10. System.out.println("当前线程名: " + Thread.currentThread().getName() + ", 任务 " + name + " is running!");
    11. Thread.sleep(200);
    12. } catch (InterruptedException e) {
    13. e.printStackTrace();
    14. }
    15. }
    16. public static void main(String[] args) {
    17. //线程池
    18. ExecutorService pools = new ThreadPoolExecutor(
    19. 1,
    20. 1,
    21. 1,
    22. TimeUnit.SECONDS,
    23. new ArrayBlockingQueue<>(1),
    24. Executors.defaultThreadFactory(),
    25. new ThreadPoolExecutor.AbortPolicy());
    26. ThreadPoolExecutorTest run = null;
    27. // 循环创建线程
    28. for (int i = 0; i < 5; i++) {
    29. run = new ThreadPoolExecutorTest("" + i);
    30. // 将任务添加到线程池中
    31. pools.execute(run);
    32. }
    33. //关闭线程池
    34. pools.shutdown();
    35. }
    36. }

     

     

    2)DiscardPolicy实战

    1. import java.util.concurrent.*;
    2. public class ThreadPoolExecutorTest implements Runnable {
    3. private String name;
    4. public ThreadPoolExecutorTest(String name) {
    5. this.name = name;
    6. }
    7. @Override
    8. public void run() {
    9. try {
    10. System.out.println("当前线程名: " + Thread.currentThread().getName() + ", 任务 " + name + " is running!");
    11. Thread.sleep(200);
    12. } catch (InterruptedException e) {
    13. e.printStackTrace();
    14. }
    15. }
    16. public static void main(String[] args) {
    17. //线程池
    18. ExecutorService pools = new ThreadPoolExecutor(
    19. 1,
    20. 1,
    21. 1,
    22. TimeUnit.SECONDS,
    23. new ArrayBlockingQueue<>(1),
    24. Executors.defaultThreadFactory(),
    25. new ThreadPoolExecutor.DiscardPolicy());
    26. ThreadPoolExecutorTest run = null;
    27. // 循环创建线程
    28. for (int i = 0; i < 5; i++) {
    29. run = new ThreadPoolExecutorTest("" + i);
    30. // 将任务添加到线程池中
    31. pools.execute(run);
    32. }
    33. //关闭线程池
    34. pools.shutdown();
    35. }
    36. }

     

    3)DiscardOldestPolicy实战

    1. import java.util.concurrent.*;
    2. public class ThreadPoolExecutorTest implements Runnable {
    3. private String name;
    4. public ThreadPoolExecutorTest(String name) {
    5. this.name = name;
    6. }
    7. @Override
    8. public void run() {
    9. try {
    10. System.out.println("当前线程名: " + Thread.currentThread().getName() + ", 任务 " + name + " is running!");
    11. Thread.sleep(200);
    12. } catch (InterruptedException e) {
    13. e.printStackTrace();
    14. }
    15. }
    16. public static void main(String[] args) {
    17. //线程池
    18. ExecutorService pools = new ThreadPoolExecutor(
    19. 1,
    20. 1,
    21. 1,
    22. TimeUnit.SECONDS,
    23. new ArrayBlockingQueue<>(1),
    24. Executors.defaultThreadFactory(),
    25. new ThreadPoolExecutor.DiscardOldestPolicy());
    26. ThreadPoolExecutorTest run = null;
    27. // 循环创建线程
    28. for (int i = 0; i < 5; i++) {
    29. run = new ThreadPoolExecutorTest("" + i);
    30. // 将任务添加到线程池中
    31. pools.execute(run);
    32. }
    33. //关闭线程池
    34. pools.shutdown();
    35. }
    36. }

     

    4)CallerRunsPolicy实战

    1. import java.util.concurrent.*;
    2. public class ThreadPoolExecutorTest implements Runnable {
    3. private String name;
    4. public ThreadPoolExecutorTest(String name) {
    5. this.name = name;
    6. }
    7. @Override
    8. public void run() {
    9. try {
    10. System.out.println("当前线程名: " + Thread.currentThread().getName() + ", 任务 " + name + " is running!");
    11. Thread.sleep(200);
    12. } catch (InterruptedException e) {
    13. e.printStackTrace();
    14. }
    15. }
    16. public static void main(String[] args) {
    17. //线程池
    18. ExecutorService pools = new ThreadPoolExecutor(
    19. 1,
    20. 1,
    21. 1,
    22. TimeUnit.SECONDS,
    23. new ArrayBlockingQueue<>(1),
    24. Executors.defaultThreadFactory(),
    25. new ThreadPoolExecutor.CallerRunsPolicy());
    26. ThreadPoolExecutorTest run = null;
    27. // 循环创建线程
    28. for (int i = 0; i < 5; i++) {
    29. run = new ThreadPoolExecutorTest("" + i);
    30. // 将任务添加到线程池中
    31. pools.execute(run);
    32. }
    33. //关闭线程池
    34. pools.shutdown();
    35. }
    36. }

     

    3、线程池处理Callable

    ExecutorService方法解释
    void execute(Runnable command)执行任务/命令,没有返回值,一般用来执行 Runnable 任务
    Future submit(Callable task)执行任务,返回未来任务对象获取线程结果,一般拿来执行 Callable 任务
    void shutdown()等任务执行完毕后关闭线程池
    List shutdownNow()立刻关闭,停止正在执行的任务,并返回队列中未执行的任务

     

    1. import java.util.concurrent.Callable;
    2. public class MyCallable implements Callable {
    3. private String name;
    4. public MyCallable(String name) {
    5. this.name = name;
    6. }
    7. @Override
    8. public Object call() throws Exception {
    9. for (int i = 0; i < 10; i++) {
    10. System.out.println(Thread.currentThread().getName() + name + " " + i);
    11. }
    12. return true;
    13. }
    14. }
    15. import java.util.concurrent.*;
    16. public class ThreadPoolExecutorTest {
    17. public static void main(String[] args) throws ExecutionException, InterruptedException {
    18. //创建线程池
    19. ExecutorService pools = Executors.newFixedThreadPool(3);
    20. MyCallable myCallable1 = new MyCallable(" hadoop");
    21. MyCallable myCallable2 = new MyCallable(" flink");
    22. MyCallable myCallable3 = new MyCallable(" spark");
    23. //提交执行
    24. Future<Boolean> sb1 = pools.submit(myCallable1);
    25. Future<Boolean> sb2 = pools.submit(myCallable2);
    26. Future<Boolean> sb3 = pools.submit(myCallable3);
    27. //获取结果
    28. boolean b1 = sb1.get();
    29. boolean b2 = sb2.get();
    30. boolean b3 = sb3.get();
    31. System.out.println(b1);
    32. System.out.println(b2);
    33. System.out.println(b3);
    34. // 关闭线程池,如不关闭,线程池会一直运行
    35. pools.shutdown();
    36. }
    37. }

    4、Callable和Runnable接口的区别

    Callable接口中的Call方法有返回值,Runnable接口中的Run方法没有返回值

    Callable接口中的Call方法有声明异常,Runnable接口中的Run方法没有异常

  • 相关阅读:
    DALL·E 3:OpenAI的革命性图像生成模型与ChatGPT的融合
    五、DMSQL
    使用VUE3.0版本搭建H5模板
    使用D435i+Avia跑Fast-LIVO
    leetCode 416.分割等和子集 + 01背包 + 动态规划 + 记忆化搜索 + 递推 + 空间优化
    【C++位图】1. 快速查找某个数据是否在一个集合中 2. 排序(全部插入,遍历一遍) 3. 求两个集合的交集、并集等
    时间复杂度与空间复杂度
    SpringDoc上传附件或文件 - Swagger3
    Linux操作系统:函数、CronTab及定时任务和站点可用性监测
    アィシャ / 艾夏
  • 原文地址:https://blog.csdn.net/java_lujj/article/details/126901005