线程通信就是线程间相互发送数据,线程间共享一个资源即可实现线程通信
通过共享一个数据的方式实现
根据共享数据的情况决定自己该怎么做,以及通知其他线程怎么做
生产者与消费者模型:生产者线程负责生产数据,消费者线程负责消费生产者产生的数据
生产者线程生产完数据后唤醒消费者,然后等待自己,消费者消费完该数据后唤醒生产者,然后等待自己
| 方法名 | 解释 |
|---|---|
void wait() | 当前线程等待,直到另一个线程调用notify() 或 notifyAll()唤醒自己 |
void notify() | 唤醒正在等待对象监视器(锁对象)的单个线程 |
void notifyAll() | 唤醒正在等待对象监视器(锁对象)的所有线程 |
可以复用线程的技术
用ExecutorService实现类ThreadPoolExecutor自创建一个线程池对象
用Executors(线程池的工具类)调用方法返回不同特点的线程池对象
- public ThreadPoolExecutor(
- int corePoolSize,//核心线程数
- int maximumPoolSize,//最大线程数
- long keepAliveTime,//线程空闲时间
- TimeUnit unit,//时间单位
- BlockingQueue<Runnable> workQueue,//任务队列
- ThreadFactory threadFactory,//线程工厂
- RejectedExecutionHandler handler//拒绝策略
- )
| 参数名 | 解释 |
|---|---|
corePoolSize | 指定线程池的线程数量(核心线程),不能小于0 |
maximumPoolSize | 指定线程池可支持的最大线程数,最大数量 >= 核心线程数量 |
keepAliveTime | 指定临时线程的最大存活时间 ,不能小于0 |
unit | 指定存活时间的单位(秒、分、时、天) |
workQueue | 指定任务队列 ,不能为null |
threadFactory | 指定用哪个线程工厂创建线程,不能为null |
handler | 指定线程忙,任务满的时候,新任务来了怎么办,不能为null |
- TimeUnit.DAYS; //天
- TimeUnit.HOURS; //小时
- TimeUnit.MINUTES; //分钟
- TimeUnit.SECONDS; //秒
- TimeUnit.MILLISECONDS; //毫秒
- TimeUnit.MICROSECONDS; //微妙
- TimeUnit.NANOSECONDS; //纳秒
| ExecutorService方法 | 解释 |
|---|---|
void execute(Runnable command) | 执行任务/命令,没有返回值,一般用来执行 Runnable 任务 |
Future | 执行任务,返回未来任务对象获取线程结果,一般拿来执行 Callable 任务 |
void shutdown() | 等任务执行完毕后关闭线程池 |
List | 立刻关闭,停止正在执行的任务,并返回队列中未执行的任务 |
| 4种策略 | 解释 |
|---|---|
ThreadPoolExecutor.AbortPolicy | 丢弃任务并抛出RejectedExecutionException异常。是默认的策略 |
ThreadPoolExecutor.DiscardPolicy | 丢弃任务,但是不抛出异常 这是不推荐的做法 |
ThreadPoolExecutor.DiscardOldestPolicy | 抛弃队列中等待最久的任务 然后把当前任务加入队列中 |
ThreadPoolExecutor.CallerRunsPolicy | 由主线程负责调用任务的run()方法从而绕过线程池直接执行 |
当运行任务数超过核心数时,会报RejectedExecutionException错误

- import java.util.concurrent.*;
-
- public class ThreadPoolExecutorTest implements Runnable {
-
- private String name;
-
- public ThreadPoolExecutorTest(String name) {
-
- this.name = name;
- }
-
- @Override
- public void run() {
-
- try {
-
- System.out.println("当前线程名: " + Thread.currentThread().getName() + ", 任务 " + name + " is running!");
- Thread.sleep(200);
- } catch (InterruptedException e) {
-
- e.printStackTrace();
- }
- }
-
- public static void main(String[] args) {
-
- //线程池
- ExecutorService pools = new ThreadPoolExecutor(
- 1,
- 1,
- 1,
- TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(1),
- Executors.defaultThreadFactory(),
- new ThreadPoolExecutor.AbortPolicy());
-
- ThreadPoolExecutorTest run = null;
- // 循环创建线程
- for (int i = 0; i < 5; i++) {
-
- run = new ThreadPoolExecutorTest("" + i);
- // 将任务添加到线程池中
- pools.execute(run);
- }
-
- //关闭线程池
- pools.shutdown();
- }
- }

- import java.util.concurrent.*;
-
- public class ThreadPoolExecutorTest implements Runnable {
-
- private String name;
-
- public ThreadPoolExecutorTest(String name) {
-
- this.name = name;
- }
-
- @Override
- public void run() {
-
- try {
-
- System.out.println("当前线程名: " + Thread.currentThread().getName() + ", 任务 " + name + " is running!");
- Thread.sleep(200);
- } catch (InterruptedException e) {
-
- e.printStackTrace();
- }
- }
-
- public static void main(String[] args) {
-
- //线程池
- ExecutorService pools = new ThreadPoolExecutor(
- 1,
- 1,
- 1,
- TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(1),
- Executors.defaultThreadFactory(),
- new ThreadPoolExecutor.DiscardPolicy());
-
- ThreadPoolExecutorTest run = null;
- // 循环创建线程
- for (int i = 0; i < 5; i++) {
-
- run = new ThreadPoolExecutorTest("" + i);
- // 将任务添加到线程池中
- pools.execute(run);
- }
-
- //关闭线程池
- pools.shutdown();
- }
- }

- import java.util.concurrent.*;
-
- public class ThreadPoolExecutorTest implements Runnable {
-
- private String name;
-
- public ThreadPoolExecutorTest(String name) {
-
- this.name = name;
- }
-
- @Override
- public void run() {
-
- try {
-
- System.out.println("当前线程名: " + Thread.currentThread().getName() + ", 任务 " + name + " is running!");
- Thread.sleep(200);
- } catch (InterruptedException e) {
-
- e.printStackTrace();
- }
- }
-
- public static void main(String[] args) {
-
- //线程池
- ExecutorService pools = new ThreadPoolExecutor(
- 1,
- 1,
- 1,
- TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(1),
- Executors.defaultThreadFactory(),
- new ThreadPoolExecutor.DiscardOldestPolicy());
-
- ThreadPoolExecutorTest run = null;
- // 循环创建线程
- for (int i = 0; i < 5; i++) {
-
- run = new ThreadPoolExecutorTest("" + i);
- // 将任务添加到线程池中
- pools.execute(run);
- }
-
- //关闭线程池
- pools.shutdown();
- }
- }

- import java.util.concurrent.*;
-
- public class ThreadPoolExecutorTest implements Runnable {
-
- private String name;
-
- public ThreadPoolExecutorTest(String name) {
-
- this.name = name;
- }
-
- @Override
- public void run() {
-
- try {
-
- System.out.println("当前线程名: " + Thread.currentThread().getName() + ", 任务 " + name + " is running!");
- Thread.sleep(200);
- } catch (InterruptedException e) {
-
- e.printStackTrace();
- }
- }
-
- public static void main(String[] args) {
-
- //线程池
- ExecutorService pools = new ThreadPoolExecutor(
- 1,
- 1,
- 1,
- TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(1),
- Executors.defaultThreadFactory(),
- new ThreadPoolExecutor.CallerRunsPolicy());
-
- ThreadPoolExecutorTest run = null;
- // 循环创建线程
- for (int i = 0; i < 5; i++) {
-
- run = new ThreadPoolExecutorTest("" + i);
- // 将任务添加到线程池中
- pools.execute(run);
- }
-
- //关闭线程池
- pools.shutdown();
- }
- }
| ExecutorService方法 | 解释 |
|---|---|
void execute(Runnable command) | 执行任务/命令,没有返回值,一般用来执行 Runnable 任务 |
Future | 执行任务,返回未来任务对象获取线程结果,一般拿来执行 Callable 任务 |
void shutdown() | 等任务执行完毕后关闭线程池 |
List | 立刻关闭,停止正在执行的任务,并返回队列中未执行的任务 |

- import java.util.concurrent.Callable;
-
- public class MyCallable implements Callable {
-
- private String name;
-
- public MyCallable(String name) {
-
- this.name = name;
- }
-
- @Override
- public Object call() throws Exception {
-
- for (int i = 0; i < 10; i++) {
-
- System.out.println(Thread.currentThread().getName() + name + " " + i);
- }
- return true;
- }
- }
- import java.util.concurrent.*;
-
- public class ThreadPoolExecutorTest {
-
-
- public static void main(String[] args) throws ExecutionException, InterruptedException {
-
- //创建线程池
- ExecutorService pools = Executors.newFixedThreadPool(3);
-
- MyCallable myCallable1 = new MyCallable(" hadoop");
- MyCallable myCallable2 = new MyCallable(" flink");
- MyCallable myCallable3 = new MyCallable(" spark");
-
- //提交执行
- Future<Boolean> sb1 = pools.submit(myCallable1);
- Future<Boolean> sb2 = pools.submit(myCallable2);
- Future<Boolean> sb3 = pools.submit(myCallable3);
-
- //获取结果
- boolean b1 = sb1.get();
- boolean b2 = sb2.get();
- boolean b3 = sb3.get();
-
- System.out.println(b1);
- System.out.println(b2);
- System.out.println(b3);
-
- // 关闭线程池,如不关闭,线程池会一直运行
- pools.shutdown();
- }
-
- }
Callable接口中的Call方法有返回值,Runnable接口中的Run方法没有返回值
Callable接口中的Call方法有声明异常,Runnable接口中的Run方法没有异常