| 学习路线指引(点击解锁) | 知识定位 | 人群定位 |
|---|---|---|
| 🧡 Python实战微信订餐小程序 🧡 | 进阶级 | 本课程是python flask+微信小程序的完美结合,从项目搭建到腾讯云部署上线,打造一个全栈订餐系统。 |
| 💛Python量化交易实战💛 | 入门级 | 手把手带你打造一个易扩展、更安全、效率更高的量化交易系统 |
PS:Java程序启动的时候,JVM就是一个进程,JVM会执行main方法,main方法就是主线程,同时会再启动一个垃圾回收线程(守护线程)GC进行垃圾回收。即:Java最少有两个线程并发,主线程 main 方法和守护线程GC。
在Java语言中,堆内存和方法区内存共享。但是栈内存独立,一个线程一个栈。假设启动10个线程,会有10个栈空间,每个栈和每个栈之间,互不干扰,各自执行各自的,这就是多线程并发。Java中之所以有多线程机制,目的就是为了提高程序的处理效率。
PS:火车站,可以看做是一个进程。火车站中的每一个售票窗口可以看做是一个线程。
我在窗口1购票,你可以在窗口2购票,你不需要等我,我也不需要等你。所以多线程并发可以提高效率。
多核CPU的可以真正的是实现多线程并发,例如4核CPU表示同一个时间点上,可以真正的有4个进程并发执行。
单核的CPU不能够做到真正的多线程并发,但是可以做到给人一种“多线程并发”的感觉,原因是CPU的运行速度很快。对于单核的CPU来说,在某一个时间点上实际上只能处理一件事情,但是由于CPU的处理速度极快,多个线程之间频繁切换执行,给别人的感觉是:多个事情同时在做!!
同时,多线程程序并不是同时进行的,由于CPU的执行速度太快,CPU会在不同的线程之间快速的切换执行,这个现象就是上下文切换,即:CPU从一个线程或进程切换到另一个线程或进程。
public class ThreadTest02 {
public static void main(String[] args) {
// 启动线程
new MyThread().start();
// 直接调用run()方法
// new MyThread().run();
// 主线程运行的程序
for(int i = 0; i < 1000; i++){
System.out.println("主线程--->" + i);
}
}
}
class MyThread extends Thread {
@Override
public void run() {
// 编写程序,这段程序运行在分支线程中(分支栈)。
for(int i = 0; i < 1000; i++){
System.out.println("分支线程--->" + i);
}
}
}
run() 方法不会启动线程,只是普通的调用方法而已,不会分配新的分支栈(这种方式就是单线程)。
start() 方法的作用是:启动一个分支线程,在JVM中开辟一个新的栈空间,这段代码任务完成之后,瞬间就结束了。
因此start()方法只是为了开启一个新的栈空间,只要新的栈空间开出来,start()方法就结束了,线程就启动成功了。
启动成功的线程会自动调用run()方法,并且run()方法在分支栈的栈底部(压栈)。
run方法在分支栈的栈底部,main方法在主栈的栈底部。run和main是平级的。
这种方式相对于第一种方式,只是多了一个线程对象进行初始化,因为Thread的有参构造可以实现,其他的地方没有过多的变化。
/**
* 1. 创建类实现Runnable接口
*/
class MyRunnable implements Runnable{
@Override
public void run() {
for (int i = 0; i < 100; i++) {
System.out.println("分支线程->" + i);
}
}
}
public class CreateThread {
public static void main(String[] args) {
// 启动线程
new Thread(new MyRunnable()).start();
// 主线程程序
for (int i = 0; i < 100; i++) {
System.out.println("主线程->" + i);
}
/**
* 2. 通过匿名内部类实现
*/
new Thread(() -> {
for (int i = 0; i < 100; i++) {
System.out.println("分支线程->" + i);
}
}).start();
}
}
Callable 接口类似于 Runnable,但是 Runnable 不会返回结果,并且无法抛出经过检查的异常,而 Callable 在不使用线程池的时候依赖 FutureTask 类获取返回结果。
单个线程池: 使用ExecutorService、Callable、Future实现有返回结果的线程。
ExecutoreService提供了submit()方法,传递一个Callable,或Runnable,返回Future。如果Executor后台线程池还没有完成Callable的计算,这调用返回Future对象的get()方法,会阻塞直到计算完成。
/**
* 实现Callable接口
*/
class MyCallable implements Callable {
@Override
public Integer call() {
int sum = 0;
for (int i = 0; i < 100; i++) {
sum += i;
}
return sum;
}
}
public class CreateThread {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 启动线程
new Thread(new FutureTask<>(new MyCallable()), "方式一").start();
// FutureTask futureTask = new FutureTask<>(new MyCallable());
// new Thread(futureTask).start();
// 通过futureTask.get()获取返回值
System.out.println(futureTask.get());
/**
* 通过匿名内部类实现
*/
new Thread(new FutureTask<>(() -> {
int sum = 0;
for (int i = 0; i < 100; i++) {
sum += i;
}
return sum;
}));
// 主线程程序
for (int i = 0; i < 100; i++) {
System.out.println("主线程->" + i);
}
}
}
/**
* 实现Callable接口
*/
class MyCallable implements Callable {
@Override
public Integer call() {
int sum = 0;
for (int i = 0; i < 100; i++) {
sum += i;
}
return sum;
}
}
public class CreateThread {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 主线程程序
for (int i = 0; i < 100; i++) {
System.out.println("主线程->" + i);
}
/**
* 使用单线程池实现
*/
// 1. 创建固定大小的线程池对象
ExecutorService executorService = Executors.newFixedThreadPool(1);
// 2. 提交线程任务,通过Future接口接受返回的结果
Future submit = executorService.submit(new MyCallable());
// 3. 关闭线程池
executorService.shutdown();
// 4. 调用future.get()获取callable执行完成的返回结果
System.out.println(submit.get());
}
}
新建状态(New):新创建了一个线程对象。
就绪状态(Runnable):线程对象创建后,其他线程调用了该对象的start()方法。该状态的线程位于可运行线程池中,变得可运行,等待获取CPU的使用权。
运行状态(Running):就绪状态的线程获取了CPU,执行程序代码。
阻塞状态(Blocked):阻塞状态是线程因为某种原因放弃CPU使用权,暂时停止运行。直到线程进入就绪状态,才有机会转到运行状态。阻塞的情况分三种:
死亡状态(Dead):线程执行完了或者因异常退出了run()方法,该线程结束生命周期。
Java线程有优先级,优先级高的线程会获得较多的运行机会,因此通过Thread类的setPriority()和getPriority()方法分别用来设置和获取线程的优先级。
JVM提供了10个线程优先级,但与常见的操作系统都不能很好的映射。如果希望程序能移植到各个操作系统中,应该仅仅使用Thread类有以下三个静态常量作为优先级,这样能保证同样的优先级采用了同样的调度方式。其中,主程序使用的是NORM_PRIORITY,即5,同时还有MAX_PRIORITY=10和MIN_PRIORITY=1的静态优先级常量。
线程在正常的程序中启动和停止,不需要额外的停止方式,会自动停止。但是有些情况下,有一些伺服线程还在运行,他们运行时间较长,只有当外部条件满足时,他们才会停止。针对这样的情况,提供了如下几种停止线程的方式:
public class ThreadStopUse {
public static void main(String[] args) {
FlagStop flagStop = new FlagStop();
new Thread(flagStop).start();
for (int i = 0; i < 100; i++) {
System.out.println("主线程运行的第" + i + "次");
if (i == 90) {
// 调用自己的stop方法切换标志位,停止线程
flagStop.stop();
System.out.println("分支线程该停止了");
}
}
}
}
class FlagStop implements Runnable {
/**
* 定义标志
*/
private volatile boolean exitFlag = true;
/**
* 标志转换
*/
public void stop() {
this.exitFlag = false;
}
@Override
public void run() {
int i = 0;
while (exitFlag) {
System.out.println("分支线程运行的第" + i + "次");
}
}
}
使用interrupted()方法来中断线程有两种情况:
class InterruptedStop implements Runnable {
@Override
public void run() {
for (int i = 0; i <= 200; i++) {
// 判断是否被中断,通过检查标志位
if (Thread.currentThread().isInterrupted()) {
// 处理中断逻辑
break;
}
System.out.println("i=" + i);
}
}
}
public class ThreadStopUse {
public static void main(String[] args) throws InterruptedException {
InterruptedStop interruptedStop = new InterruptedStop();
Thread thread = new Thread(interruptedStop);
thread.start();
Thread.sleep(1000);
thread.interrupt();
}
}
注意:在上面这段代码中,我们增加了 Thread.isInterrupted() 来判断当前线程是否被中断了,如果是,则退出 for 循环,结束线程。
这种方式看起来与之前介绍的“使用标志位终止线程”非常类似,但是在遇到 sleep() 或者 wait() 这样的操作,我们只能通过中断来处理了。
使用Thread.stop()方法来结束线程的运行是很危险的,主要因为在程序调用Thread.stop()后会抛出ThreadDeatherror()错误,并释放子线程所持有的所有锁,会导致被保护数据呈现不一致性,此过程不可控。
线程休眠是Thread.sleep(ms)方法,它的作用是让当前线程进入休眠,进入“阻塞状态”,放弃占有CPU时间片,让给其它线程使用。执行效果就是间隔特定的时间,去执行一段特定的代码,每隔多久执行一次。millis参数设定睡眠的时间,以毫秒为单位。当睡眠结束后,就转为就绪(Runnable)状态。
注意:每个对象都有一个锁,sleep()方法不会释放锁。
public class ThreadSleepUse {
public static void main(String[] args) {
while (true) {
try {
Thread.sleep(1000);
// 每隔一秒打印一下系统当前时间
System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
暂停当前正在执行的线程对象,但不阻塞,将线程从运行状态转为就绪状态,把执行机会让给相同或者更高优先级的线程。让CPU重新调度,但是礼让不一定成功,因为当前线程和其他线程一同竞争CPU,使得所有线程回到同一起点,优先级高的线程获得的运行机会会多一点,这个过程不会释放锁。
public class ThreadYieldUse {
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
System.out.println("主线程执行了第" + i);
}
new Thread(() -> {
for (int i = 0; i < 20; i++) {
System.out.println(Thread.currentThread().getName() + "执行了" + i + "次");
if (i % 5 == 0) {
Thread.yield();
System.out.println("线程礼让,重新争抢CPU");
}
}
}, "线程礼让").start();
}
}
线程加入就是在当前线程中调用另一个线程的join()方法,则当前线程转入阻塞状态,直到另一个进程运行结束,当前线程再由阻塞转为就绪状态。
将一个线程合并到当前线程中,当前线程受阻塞,加入的线程执行直到结束,这个是无参join()方法的作用,使用join(long millis)方法则等待该线程终止的时间最长为 millis 毫秒;使用join(long millis, int nanos)方法则等待该线程终止的时间最长为 millis 毫秒 + nanos 纳秒。
作用:一个执行完的线程需要另一个正在执行的线程的运行结果时
public class ThreadJoinUse {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
for (int j = 0; j < 50; j++) {
System.out.println("VIP线程-" + Thread.currentThread().getName() + "执行了" + j + "次");
}
}, "线程加入");
thread.start();
for (int i = 0; i < 100; i++) {
System.out.println(Thread.currentThread().getName() + "执行了" + i + "次");
if (i == 50) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
当多个线程访问一个对象时,如果不用考虑这些线程在运行时环境下的调度和交替执行,也不需要进行额外的同步,或者在调用方进行任何其他的协调操作,调用这个对象的行为都可以获得正确的结果,那这个对象就是线程安全的。
问题:通常情况下,一个进程中的比较耗时的操作(如长循环、文件上传下载、网络资源获取等),往往会采用多线程来解决。又比如实际生活中,银行取钱问题、火车票多个售票窗口的问题,通常会涉及到并发的问题,从而需要多线程的技术。
当进程中有多个并发线程进入一个重要数据的代码块时,在修改数据的过程中,很有可能引发线程安全问题,从而造成数据异常。例如,正常逻辑下,同一个编号的火车票只能售出一次,却由于线程安全问题而被多次售出,从而引起实际业务异常。
线程安全问题产生的原因——共享内存数据,当多个线程同时操作同一共享数据时,导致共享数据出错。
线程、主内存、工作内存三者的关系如图:
在 Java 内存模型中,分为主内存和线程工作内存。每条线程有自己的工作内存,线程使用共享数据时,都是先从主内存中拷贝到工作内存,线程对该变量的所有操作都必须在工作内存中进行,而不能直接读写主内存中的变量,线程使用完成之后再写入主内存。不同线程之间也无法直接访问对方工作内存中的变量,线程间变量值的传递均需要通过主内存来完成。
在多线程环境下,不同线程对同一份数据操作,就可能会产生不同线程中数据状态不一致的情况,这就是线程安全问题的原因。
当多线程并发的环境下,有共享数据,并且这个数据还会被修改,此时就存在线程安全问题,怎么解决这个问题?
要实现线程安全,需要保证数据操作的两个特性:
以上两个特性结合起来,其实就相当于同一时刻只能有一个线程去进行数据操作并将结果写入主存,这样就保证了线程安全,这种机制称为线程同步
线程同步就是线程不能并发,线程必须排队执行,因此线程同步会牺牲一部分的效率,来提升安全性
线程排队执行。(不能并发)。用排队执行解决线程安全问题。
实现方式:
synchronized 是 Java 中的关键字,是一种同步锁。它修饰的对象有以下几种:
一个线程访问一个对象中的 synchronized(this) 同步代码块时,其他试图访问该对象的线程将被阻塞。
public class ThreadSafety {
public static void main(String[] args) {
System.out.println("使用关键字synchronized");
SyncThread syncThread = new SyncThread();
new Thread(syncThread, "SyncThread1").start();
new Thread(syncThread, "SyncThread2").start();
}
}
class SyncThread implements Runnable {
private static int count = 0;
@Override
public void run() {
synchronized (this) {
for (int i = 0; i < 5; i++) {
try {
System.out.println("线程名:" + Thread.currentThread().getName() + ":" + (count++));
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 其他逻辑
}
}
当两个并发线程(thread1 和 thread2)访问同一个对象(syncThread)中的 synchronized 代码块时,在同一时刻只能有一个线程得到执行,另一个线程受阻塞,必须等待当前线程执行完这个代码块以后才能执行该代码块。Thread1 和 thread2 是互斥的,因为在执行 synchronized 代码块时会锁定当前的对象,只有执行完该代码块才能释放该对象锁,下一个线程才能执行并锁定该对象。
注:synchronized 只锁定对象,多个线程要实现同步,所以线程必须以同一个 Runnable 对象为运行对象,即:()中的对象要是同一个
这时如果创建了两个 SyncThread 的对象 syncThread1 和 syncThread2,线程 thread1 执行的是 syncThread1 对象中的 synchronized 代码(run),而线程 thread2 执行的是 syncThread2 对象中的 synchronized 代码(run);我们知道 synchronized 锁定的是对象,这时会有两把锁分别锁定 syncThread1 对象和 syncThread2 对象,而这两把锁是互不干扰的,不形成互斥,所以两个线程可以同时执行。
当一个线程访问对象的一个 synchronized(this) 同步代码块时,另一个线程仍然可以访问该对象中的非 synchronized(this) 同步代码块。
如果 synchronized 作用在静态方法中,修饰一块代码,则称为静态代码块,锁对象是当前类的字节码文件。
class SyncThread implements Runnable {
private static int count = 0;
/**
* synchronized作用在静态方法中,锁对象实当前类的字节码文件
*/
public static void save() {
synchronized (SyncThread.class) {
count++;
}
// 其他操作
}
}
Synchronized 修饰一个方法很简单,就是在方法的前面加 synchronized, synchronized 修饰方法和修饰一个代码块类似,只是作用范围不一样,修饰代码块是大括号括起来的范围,而修饰方法范围是整个函数。
class SyncThread implements Runnable {
private static int account = 100;
/**
* synchronized修饰一个方法,被修饰的方法称为同步方法,其作用的范围是整个方法,锁对象为这个方法所在的当前实例对象
* @param money
*/
public synchronized void draw(Integer money) {
account -= money;
}
}
在用synchronized修饰方法时要注意以下几点:
静态方法是属于类的而不属于对象的,synchronized 修饰的静态方法锁定的是这个类的所有对象,该类的所有对象用 synchronized 修饰的静态方法的用的是同一把锁。
效果和 synchronized 修饰静态方法是一样的,synchronized 作用于一个类时,是给这个类加锁,该类的所有对象用的是同一把锁。
从JDK 5.0开始,Java提供了更强大的线程同步机制——通过显式定义同步锁对象来实现同步,Lock 和 ReadWriteLock 是两大锁的根接口,Lock 代表实现类是 ReentrantLock(可重入锁),ReadWriteLock(读写锁)的代表实现类是 ReentrantReadWriteLock。
如果一个代码块被 synchronized 关键字修饰,当一个线程获取了对应的锁,并执行该代码块时,其他线程便只能一直等待直至占有锁的线程释放锁。
事实上,占有锁的线程释放锁一般会是以下三种情况之一:
以下三种场景只能用 Lock:
public class LockUse {
public static void main(String[] args) {
Ticket ticket = new Ticket();
new Thread(ticket).start();
new Thread(ticket).start();
new Thread(ticket).start();
}
}
class Ticket implements Runnable {
private static Integer ticketNums = 10;
/**
* 声明可重入锁
*/
private final ReentrantLock lock = new ReentrantLock();
/**
* 不加锁的情况下,线程不安全,因此可以使用Lock进行显示的加锁和解锁,锁lock必须紧跟try代码块,且unlock要放到finally第一行。
*/
@Override
public void run() {
while (true) {
// 加锁,锁lock必须紧跟try代码块,且unlock要放到finally第一行。
lock.lock();
try {
// lock.lock(); 可以出现在这个位置,但是不建议,因为如果在获取锁时发生了异常,异常抛出的同时也会导致锁无法被释放;
if (ticketNums > 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 模拟买票,票自减
System.out.println(ticketNums--);
}
} finally {
// 必须放到第一行
lock.unlock();
}
}
}
}
场景:两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
分析:
Java提供的解决线程通信问题的方法,即:等待/唤醒机制
注意:以上方法只能在同步方法或者同步代码块中使用,否则抛出异常,IlleagalMonitorStateException
方式:
class Product {
/**
* 产品编号
*/
Integer productId;
public Product(Integer productId) {
this.productId = productId;
}
}
/**
* 生产者
*/
class Production extends Thread {
Buffers buffer;
public Production(Buffers buffer) {
this.buffer = buffer;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
try {
buffer.push(new Product(i));
System.out.println("生产了" + i + "个商品");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 消费者
*/
class Customer extends Thread {
Buffers buffer;
public Customer(Buffers buffer) {
this.buffer = buffer;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
try {
System.out.println("消费了-->" + buffer.commodity().productId + "个产品");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
缓冲区的大小设定之后,设定一个计数器,生产者和消费者通过计数器去进行产品的生产和消费。生产者生产产品时,首先根据计数器去判断缓冲区是否已经满了,满了的话就等待,然后通知消费者进行消费,如果没有满的话,就继续往里面生产产品。消费者消费的时候,也是通过判断缓冲区中是否有产品存在,如果存在的话就消费,否则等待生产者进行生产,整个生产和消费的过程都是针对缓冲区进行的。
class Buffers {
/**
* 设置容器大小,产品最大数量
*/
Product[] product = new Product[10];
/**
* 计数器
*/
private int count = 0;
/**
* 生产者生产品
*
* @param products
* @throws InterruptedException
*/
public synchronized void push(Product products) throws InterruptedException {
// 如果容器满了,就等待消费者消费
if (count == product.length) {
// 通知消费者消费,生产者等待,wait(),表示线程一直等待,直到其它线程通知,与sleep不同,会释放锁
this.wait();
}
// 如果没满,就丢入产品
product[count] = products;
count++;
// 通知消费者进行消费,notify(),唤醒同一个对象上所有调用wait()方法的线程,优先级高的线程优先调度
this.notifyAll();
}
/**
* 消费者消费产品
*
* @return
* @throws InterruptedException
*/
public synchronized Product commodity() throws InterruptedException {
// 判断是否有产品可以消费
if (count == 0) {
// 消费者等待,等待生产者生产
this.wait();
}
// 消费者进行消费
count--;
Product products = product[count];
// 消费完后,通知生产者生产
this.notifyAll();
return products;
}
}
/**
* 生产者:厨师
*/
class Cook extends Thread {
Food food;
public Cook(Food food) {
this.food = food;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
food.make("凉皮");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 消费者:顾客
*/
class Judge extends Thread {
Food food;
public Judge(Food food) {
this.food = food;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
food.eat();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
使用信号法时,需要设置一个标志位,通过修改标志位的方式,使得生产者和消费者进行协同工作。当标志位为true时,生产者进行生产,消费者等待,所以在制作食物的方法中,首先让消费者(顾客)进行等待,等待生产者(厨师)进行食物制作,厨师制作完成之后,通知顾客吃饭,同时修改标志位。顾客吃饭的时候,厨师等待,顾客吃完后,通知厨师继续做饭,同时修改标志位,使得两个线程有序的进行协同工作。
class Food {
/**
* 设置标志位,true为厨师烹饪食物,顾客等待,false为厨师等待,顾客吃饭
*/
boolean flag = true;
/**
* 食物
*/
String foodName;
/**
* 烹饪食物
*
* @param foodName 食物
* @throws InterruptedException
*/
public synchronized void make(String foodName) throws InterruptedException {
// 如果flag为false则厨师等待顾客吃饭,生产者厨师等待,消费者顾客进行吃饭
if (!flag) {
this.wait();
}
System.out.println("厨师做了一道" + foodName);
// 唤醒消费者消费
this.notifyAll();
// 将厨师做的菜传递给总的菜类
this.foodName = foodName;
// 让flag为false,则消费者消费
this.flag = !this.flag;
}
/**
* 消费者吃饭
*
* @throws InterruptedException
*/
public synchronized void eat() throws InterruptedException {
// flag为true则顾客等待厨师做饭,消费者等待,生产者生产
if (flag) {
this.wait();
}
System.out.println("顾客吃了" + foodName);
// 唤醒,唤醒生产者(厨师)做菜
this.notifyAll();
// 使flag为true,让生产者继续生产
this.flag = !this.flag;
}
}
在面向对象编程中,创建和销毁对象是很费时间的,对于线程来说也是如此,尤其是当线程中执行的是简单任务的话,则大部分的时间都花费在线程的创建和销毁上。
因此为了解决这种资源浪费的情况,使用池化技术——线程池,本质上是一种对象池,用于管理线程资源,对线程进行复用,一个线程执行完当前任务后并不马上销毁,而是从任务队列中取出一个任务继续运行。即在任务执行前,需要从线程池中拿出线程来执行,在任务执行完成之后,需要把线程放回线程池。通过线程的这种反复利用机制,可以有效地避免直接创建线程所带来的坏处。这种做法提高了线程的利用率,也减少了系统开销。
线程池作用就是限制系统中执行线程的数量。根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;少了浪费了系统资源,多了造成系统拥挤效率不高。用线程池控制线程数量,其他线程排队等候。一个任务执行完毕,再从队列的中取最前面的任务开始执行。若队列中没有等待进程,线程池的这一资源处于等待。当一个新任务需要运行时,如果线程池中有等待的工作线程,就可以开始运行了;否则进入等待队列。
优点
缺点
线程池的5种状态:RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED
通过上图,我们看到了线程池的主要处理流程。我们的关注点在于,任务提交之后是怎么执行的。大致如下:
Executors是一个线程池工厂,提供了很多的工厂方法,我们来看看它大概能创建哪些线程池。
创建单一线程的线程池:ExecutorService newSingleThreadExecutor();这是一个始终都只有一个线程的池子,所有的任务都通过一个线程来执行,若多个任务被提交到此线程池,那么会被缓存到队列(队列长度为Integer.MAX_VALUE),当线程空闲的时候,按照FIFO的方式进行处理。
创建固定数量的线程池:ExecutorService newFixedThreadPool(int nThreads);创建一个具有固定线程数的线程池,当所有线程都在执行任务时,新提交的任务会一直提交到阻塞队列中。若多个任务被提交到此线程池,则会有如下处理过程:
创建带缓存的线程池:ExecutorService newCachedThreadPool();这种方式创建的线程池,核心线程池的长度为0,线程池最大长度为Integer.MAX_VALUE。由于本身使用SynchronousQueue作为等待队列的缘故,导致往队列里面每插入一个元素,必须等待另一个线程从这个队列删除一个元素,会根据线程任务的数量来进行线程的创建和释放。
创建定时调度的线程池:ScheduledExecutorService newScheduledThreadPool(int corePoolSize);和上面3个工厂方法返回的线程池类型有所不同,它返回的是ScheduledThreadPoolExecutor类型的线程池。平时我们实现定时调度功能的时候,可能更多的是使用第三方类库,比如:quartz等。但是对于更底层的功能,我们仍然需要了解。
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit),定时调度,每个调度任务会至少等待period的时间,如果任务执行的时间超过period,则等待的时间为任务执行的时间。scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit),定时调度,第二个任务执行的时间 = 第一个任务执行时间 + delay。schedule(Runnable command, long delay, TimeUnit unit),定时调度,延迟delay后执行,且只执行一次。我们写一个例子来看看如何使用定时调度:
public class ThreadPoolTest {
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 定时调度,每个调度任务会至少等待`period`的时间,如果任务执行的时间超过`period`,则等待的时间为任务执行的时间
executor.scheduleAtFixedRate(() -> {
try {
Thread.sleep(10000);
System.out.println(System.currentTimeMillis() / 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 0, 2, TimeUnit.SECONDS);
// 定时调度,第二个任务执行的时间 = 第一个任务执行时间 + `delay`
executor.scheduleWithFixedDelay(() -> {
try {
Thread.sleep(5000);
System.out.println(System.currentTimeMillis() / 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 0, 2, TimeUnit.SECONDS);
// 定时调度,延迟`delay`后执行,且只执行一次
executor.schedule(() -> System.out.println("5 秒之后执行 schedule"), 5, TimeUnit.SECONDS);
}
}
注意: 通过阅读底层源码可以看出,四种常见的线程池都直接或间接的继承自ThreadPoolExecutor类,而《阿里巴巴Java开发手册》中则强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式则必须更加明确线程池的运行规则,从而规避资源耗尽的风险。
理论上,我们可以通过Executors来创建线程池,这种方式非常简单。但正是因为简单,所以限制了线程池的功能。比如:无长度限制的队列,可能因为任务堆积导致OOM,这是非常严重的bug,应尽可能地避免。同时,根据《阿里巴巴Java开发手册》中则强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,因此归根结底,还是需要我们通过更底层的方式来创建线程池。
从Executors的底层实现上不难看出,其中的几个方法都使用了 ThreadPoolExecutor 的默认配置,抛开定时调度的线程池不管,ThreadPoolExecutor最底层的构造方法却只有一个。那么,我们就从这个构造方法着手分析。
public ThreadPoolExecutor(int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 最长存活时间
TimeUnit unit, // 存活时间单位
BlockingQueue workQueue, // 阻塞队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler) { // 饱和策略
/*
* 使用两个if语句进行参数合法性判断
*/
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
由构造方法可知,ThreadPoolExecutor 类的构造参数总共有7个,我们逐一进行分析。
corePoolSize:线程池中的核心线程数。当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize, 即使有其他空闲线程能够执行新来的任务, 也会继续创建线程;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行。
maximumPoolSize:线程池中的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,直到当前线程数等于maximumPoolSize则停止创建;当阻塞队列是无界队列时,maximumPoolSize则不起作用, 因为无法提交至核心线程池的线程会一直持续地放入 workQueue(阻塞队列)。
keepAliveTime:空闲时间,当线程池数量超过核心线程数时,多余的空闲线程存活的时间,即:这些线程多久被销毁。默认情况下,该参数只在线程数大于corePoolSize(核心线程数)时才有用, 超过这个时间的空闲线程将被终止。
unit:空闲时间的单位,可以是毫秒、秒、分钟、小时和天等等。
workQueue:等待(阻塞)队列,线程池中的线程数超过核心线程数时,任务将放在等待队列,等待队列默认是 BlockingQueue 类型的,同时JDK内部自带的主要有以下几种:
threadFactory:线程工厂,我们可以使用它来创建一个线程,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。默认为DefaultThreadFactory,Executors的实现使用了默认的线程工厂 DefaultThreadFactory。
handler:拒绝策略(饱和策略),当线程池和等待队列都满了之后,需要通过该对象的回调函数进行回调处理。即:如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
通常情况下,我们需要指定阻塞队列的上界(比如1024)。另外,如果执行的任务很多,我们可能需要将任务进行分类,然后将不同分类的任务放到不同的线程池中执行。
四种拒绝策略各有优劣,比较常用的是DiscardPolicy,但是这种策略有一个弊端就是任务执行的轨迹不会被记录下来。所以,我们往往需要实现自定义的拒绝策略, 通过实现RejectedExecutionHandler接口的方式。
简单实现
public class ThreadPool {
public static void main(String[] args) {
/**
* 初始化一个指定的线程池,核心线程2个,最大线程5个,销毁时间1秒,阻塞队列使用ArrayBlockingQueue
*/
ExecutorService executor = new ThreadPoolExecutor(2, 5, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1)) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("beforeExecute is called:调用执行之前被调用");
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("afterExecute is called:调用执行之后被调用");
}
@Override
protected void terminated() {
System.out.println("terminated is called:终止调用");
}
};
// 提交任务
executor.submit(() -> System.out.println("this is a task"));
// 关闭线程池
executor.shutdown();
}
}
ExecutorService总共提供了两种任务提交的方法,分别是execute()方法和submit()方法,主要区别如下:
public class TaskSubmit {
public static void main(String[] args) {
// 创建线程池
ExecutorService executor = new ThreadPoolExecutor(2, 5, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
// 只能提交Runnable任务
executor.execute(() -> System.out.println("execute()方法只能提交Runnable任务"));
// 既可以提交Runnable任务,又可以提交Callable任务,只是前者返回null,后者返回值
Future callableFuture = executor.submit(() -> 1 + 1);
Future runnableFuture = executor.submit(() -> System.out.println("Runnable任务会返回null"));
try {
// 只有获取返回值的时候才需要处理异常
System.out.println(callableFuture.get());
System.out.println(runnableFuture.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
executor.shutdown();
}
}
ExecutorService提供了shutDown()和shutDownNow()两个函数来关闭线程池,底层还是通过逐个调用线程的interrupt()函数来实现中断线程从而关闭线程池的。
shutdown函数会把线程池的状态则立刻变成SHUTDOWN状态。此时,则不能再往线程池中添加任何任务,否则将会抛出RejectedExecutionException异常。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。(即将当前所有线程任务执行完毕再销毁线程池)
shutdownNow方法会先将线程池状态修改为STOP,然后调用线程池里的所有线程的interrupt方法,并把工作队列中尚未来得及执行的任务清空到一个List中返回,getTask()方法返回null,从而线程退出 。但是ShutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。(即直接销毁线程池,不会考虑是否有线程任务再执行)
__EOF__
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PTDC7rbd-1656132206290)(https://blog.csdn.net/mmgmj)]静守己心、笑淡浮华 - 本文链接: https://blog.csdn.net/mmgmj/p/16401877.html