同步和异步通常用来形容一次方法的调用
它们都可以代表两个或多个任务一起执行。
临界区用来表示一种公共资源或者共享资源,可以被多个线程使用。但是每一次只能有一个线程使用它,一旦临界区资源被占用,其他要使用这个资源的线程必须等待。
死锁、饥饿、活锁都是线程的活跃性问题。如果线程发生了上面的几种情况,那么线程可能就不在活跃了,也就是说它们很难再继续执行下去了。
由于临界区的存在,多线程之间的并发必须受到控制。根据控制并发的策略,我们可以把并发的级别分为阻塞、无饥饿、无障碍、无锁、无等待几种。
一个线程是阻塞的,那么在其他线程释放资源之前,当前线程无法执行,当我们使用sychronized和重入锁时,我们得到的就是阻塞线程。
如果线程之间时有优先级的,那么线程的调度总是满足高优先级线程。也就是说,对于同一个资源的分配是不公平的。
无障碍是一种最弱的非阻塞调度。两个线程如果无阻塞的执行,那么不会因为临界区的问题导致一方被挂起。换而言之,大家都可以无阻碍的进入临界区,一起修改共享数据,但把数据修改坏了怎么办?对于无障碍的线程来说,一旦发现这种情况,它会立即对自己所做的修改进行回滚,确保数据安全。
无锁的并行都是无障碍的。在无锁的情况下,所有的线程都能尝试对临界区进行访问,但不同的是,无锁的并发保证必然有一个线程能在有限步内完成操作离开临界区
无锁只要求有一个 线程在有限步内完成操作,而无等待则在无锁的基础上进一步拓展。它要求所有的线程都要在有限步内完成操作,这样就不会引起饥饿的情况。
它定义了串行系统并行后的加速比的计算公式和理论上限
加速比 = 优化前系统耗时 / 优化后系统耗时
n:处理器数量,T:时间,T1:优化前耗时,Tn:优化后耗时,F:程序中只能串行执行的比例。

50%的代码串行执行,那么系统的最大加速比为2。

加速比 = 1/(0.6+((1-0.6)/2))=1.67Gustafson定律也试图说明处理器个数、串行化比例和加速比之间的关系。

从Gustafson定律中可以看出,如果串行化比例很小,并行化比例较大,那么加速比就是处理器个数,只有无限累加处理器,那么就可以获得更快的速度。
Amdahl: 当系统的串行比例一定时,加速比是有上限的,无论加多少个处理器都无法突破加速比的极限
Gustafson: 如果并行化的代码所占的比例足够大,那么加速比就能随着处理器数量线性增长。
Thread thread = new Thread();
注意:启动线程应该调用thread.start(),而不是调用thread.run(),因为后者只是调用了一个普通方法,而没有启动线程。
Thread.isInterrupt() 对线程状态进行判断,进而进行对线程的进一步处理。而在当前线程(thread)处于sleep的时候被其他线程调用了thread.interrupt(),则会抛出异常InterruptedExceptionobject的方法。Object.wait()和Object.notify()必须包含着sychronized同步块中。wait会释放锁,而sleep不会释放锁。object.wait()则该线程会进入object的等待队列。这个等待队列中可能会有很多个线程等待同一个对象。当object.notify()被调用的时候,它会在等待队列中随机唤醒一个线程,这个选择是完全随机的。activeCount()方法可以获得活动线程总数,但由于线程是动态的,所以这个数是一个估计值,无法精确。list()可以打印线程组中的线程信息,对调试有一定帮助。package ThreadStudy;
public class ThreadGroupTest implements Runnable{
public static void main(String[] args) {
ThreadGroup threadGroup = new ThreadGroup("PrintGroup");
Thread t1 = new Thread(threadGroup,new ThreadGroupTest(),"T1");
Thread t2 = new Thread(threadGroup,new ThreadGroupTest(),"T2");
t1.start();
t2.start();
System.out.println(threadGroup.activeCount());
threadGroup.list();
}
@Override
public void run() {
String name = "Group name"+Thread.currentThread().getThreadGroup().getName() +
"My name : " + Thread.currentThread().getName();
while(true){
System.out.println(name);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
垃圾回收线程,JIT线程等。setDaemon()把线程设置为守护线程。且一定要防止start()之前,如果放在start()之后,那么该线程会被当作用户线程,而且你会得到以下报错。Exception in thread "main" java.lang.IllegalThreadStateExceptionpackage ThreadStudy;
public class ThreadDaemonTest extends Thread {
@Override
public void run() {
while(true){
System.out.println("I am Alive");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
Thread t = new ThreadDaemonTest();
t.setDaemon(true);
t.start();
Thread.sleep(2000);
}
}
package ThreadStudy;
public class ThreadPriorityTest {
public static class HighPriority extends Thread{
static int count = 0;
@Override
public void run() {
while (true){
synchronized (ThreadPriorityTest.class){
count++;
if (count>10000) {
System.out.println("HighPriority---End");
break;
}
}
}
}
}
public static class LowPriority extends Thread{
static int count = 0;
@Override
public void run() {
while (true){
synchronized (ThreadPriorityTest.class){
count++;
if (count>10000){
System.out.println("LowPriority---End");
break;
}
}
}
}
}
public static void main(String[] args) throws Exception{
Thread highPriority = new HighPriority();
Thread lowPriority = new LowPriority();
//设置线程优先级
highPriority.setPriority(10);
lowPriority.setPriority(1);
//开启线程
lowPriority.start();
highPriority.start();
}
}
(1)、指定加锁对象
指定加锁对象为o,每次进入synchronized包裹的代码块就会要求请求instance的锁。
public static class LowPriority extends Thread{
static int count = 0;
Object o = new Object();
@Override
public void run() {
while (true){
synchronized (o){
count++;
if (count>10000){
System.out.println("LowPriority---End");
break;
}
}
}
}
}
(2)、直接作用于实例方法
public static class LowPriority implements Runnable{
static LowPriority instance = new LowPriority ();
public synchronized void increase(){
}
@Override
public void run() {
while (true){
increase();
}
}
public static void main(String[] args) throws Exception{
Thread t1 = new Thread (instance);
Thread t2 = new Thread (instance);
t1.start();t2.start()
}
}
public static void main(String[] args) throws Exception{
Thread t1 = new Thread (new LowPriority());
Thread t2 = new Thread (new LowPriority());
t1.start();t2.start()
}
public static synchronized void increase(){
}
重入锁顾名思义就是可以重复进入的锁,一个线程是可以多次获得锁的,如果像以下代码获得了两次锁,那么释放锁的时候就得释放两次,否则其他线程也无法获得该锁,也就无法进入临界区。
package ThreadStudy;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockTest implements Runnable{
public static ReentrantLock lock = new ReentrantLock();
@Override
public void run() {
for (int i=0 ; i<100000 ; i++){
lock.lock();
lock.lock();
try {
i++;
}finally {
lock.unlock();
lock.unlock();
}
}
}
}
对于关键词sychronized来说,如果一个线程在等待锁,那么就只有两种情况,一种就是获得锁继续执行下去,要么就是继续等待锁。而使用重入锁,则提供另一种可能,就是线程可以被中断。也就是线程在等待锁得时候,可以根据需求取消对锁的请求。例如:如果一个线程在等待锁,那么它可以收到一个通知,被告知无需等待,可以停止工作了。这种情况对于处理死锁是有一定帮助的。
package ThreadStudy;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockInterruptTest implements Runnable{
public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();
int lock;
ReentrantLockInterruptTest(int lock){
this.lock = lock;
}
@Override
public void run() {
try {
if (lock == 1){
lock1.lockInterruptibly();
Thread.sleep(500);
lock2.lockInterruptibly();
}else {
lock2.lockInterruptibly();
Thread.sleep(500);
lock1.lockInterruptibly();
}
}catch (Exception e){
e.printStackTrace();
}finally {
if (lock1.isHeldByCurrentThread()){
lock1.unlock();
}else {
lock2.unlock();
}
System.out.println(Thread.currentThread().getId()+":线程退出!");
}
}
public static void main(String[] args) throws Exception{
ReentrantLockInterruptTest lock1 = new ReentrantLockInterruptTest(1);
ReentrantLockInterruptTest lock2 = new ReentrantLockInterruptTest(2);
Thread thread1 = new Thread(lock1);
Thread thread2 = new Thread(lock2);
thread1.start();thread2.start();
Thread.sleep(2000);
//中断一个线程
thread2.interrupt();
}
}
线程1和线程2抢占lock1和lock2,由于线程1先获得lock1,线程2先获得lock2,导致两个线程处于死锁状态,在main方法的最后一行对线程2进行中断操作,那么线程2就释放了lock2,线程1获得了lock2得以正常执行完成,程序执行完成之后控制台有以下输出。
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
at ThreadStudy.ReentrantLockInterruptTest.run(ReentrantLockInterruptTest.java:24)
at java.lang.Thread.run(Thread.java:748)
12:线程退出!
11:线程退出!
通常,我们无法判断为什么一个线程无法获得锁,可能是死锁,也可能是饥饿,如果给定一个时间,让线程自动放弃是有意义的。
package ThreadStudy;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantTryLockTest implements Runnable {
public static ReentrantLock lock = new ReentrantLock();
@Override
public void run() {
try {
if (lock.tryLock(5, TimeUnit.SECONDS)){
Thread.sleep(6000);
}else {
System.out.println("获取锁失败!");
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
if (lock.isHeldByCurrentThread())
lock.unlock();
}
}
public static void main(String[] args) {
ReentrantTryLockTest test = new ReentrantTryLockTest();
Thread t1 = new Thread(test);
Thread t2 = new Thread(test);
t1.start();
t2.start();
}
}
lock.tryLock()接收两个参数,第一个参数是等待时间,第二个参数是等待时间单位。如果线程在该时间内获得了锁,则返回true,否则返回false。
大多数情况下,锁的获得都是非公平的,但公平锁会按照请求锁的时间先后顺序,保证先到先得,后到后得,公平锁最大的一个特点就是:不会产生饥饿现象。如果我们使用synchronized关键词进行锁控制,那么产生的锁就是非公平的。
重入锁允许我们对其公平性进行设置:
public ReentrantLock(boolean fair) {}
当参数fair为true的时候,表示锁是公平的。但要实现公平锁必须维护一个有序队列,因此公平锁的实现成本较高,并且效率低下,所以一般锁都是非公平的。
就重入锁的实现来看,主要包含三个要素:
Condition和Object.notify(),Object.wait()的作用是一样的,Object的方法是搭配synchronized关键词使用的,而Condition是配合重入锁使用的。
从广义上讲,信号量是对锁的拓展。无论是内部锁synchronized,还是重入锁ReentrantLock,一次都只允许一个线程访问一个资源,而信号量可以指定多个线程,同时访问某一个资源。
public Semaphore(int permits) {}
public Semaphore(int permits, boolean fair) {}
在构造信号量对象的时候,必须指定信号量的准入数(许可量)。
public void acquire()//线程尝试获取许可,若无法获得,则等待,直到有线程释放许可或者当前线程中断
public void acquireUninterruptibly()//和acquire类似,但不响应中断
public boolean tryAcquire()//尝试获取许可,获取到则true,否则false,不会等待,立即返回。
public void release(int permits)//线程访问资源结束后释放资源。
package ThreadStudy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreTest implements Runnable {
final Semaphore semaphore = new Semaphore(5);
@Override
public void run() {
try {
semaphore.acquire();
Thread.sleep(2000);
System.out.println(Thread.currentThread().getId()+":done!");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(20);
final SemaphoreTest test = new SemaphoreTest();
for (int i=0 ; i<20 ; i++){
executorService.execute(test);
}
}
}
申请信号量的时候使用acquire(),离开的时候使用release()。
例如:线程W1、W2、W3进行写操作,线程R1、R2、R3进行读操作,如果使用内部锁或者重入锁,理论来说所有的读写、读读、写写之间都是串行操作。当R1进行读操作的时候,R2、R3则需要等待锁,由于读操作并不会对数据的完整性造成破坏,这种等待则是完全不合理的。
所以,读写锁允许多个线程同时读,R1、R2、R3之间是真正的并行。但是,考虑到数据的完整性,读写和写写之间仍然是需要相互等待和持有锁的。
读写锁的访问约束情况
| 读 | 写 | |
|---|---|---|
| 读 | 不阻塞 | 阻塞 |
| 写 | 阻塞 | 阻塞 |
package ThreadStudy;
import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLock {
private static Lock lock = new ReentrantLock();
private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private static Lock writeLock = readWriteLock.writeLock();
private static Lock readLock = readWriteLock.readLock();
private int value;
public Object handleRead (Lock lock) {
try {
lock.lock();
Thread.sleep(1000);
return value;
}catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
return null;
}
public void handleWrite(Lock lock,int value){
try {
lock.lock();
Thread.sleep(1000);
this.value = value;
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public static void main(String[] args) {
ReadWriteLock readWriteLock = new ReadWriteLock();
Runnable read = new Runnable() {
@Override
public void run() {
readWriteLock.handleRead(lock);
// readWriteLock.handleRead(readLock);
}
};
Runnable write = new Runnable() {
@Override
public void run() {
readWriteLock.handleWrite(lock,new Random().nextInt());
// readWriteLock.handleWrite(writeLock,new Random().nextInt());
}
};
for (int i=0 ; i<18 ; i++){
new Thread(read).start();
}
for (int i=0 ; i<2 ; i++){
new Thread(write).start();
}
}
}
以上代码进行注释的地方使用的是读写锁,而没注释的地方使用的是重入锁,下面两个for循环模拟18个线程读和2个线程写操作。当使用读写锁的时候,代码耗时2秒多,因为读操作完全并行。而使用重入锁的时候,代码耗时20秒左右,因为读操作和写操作都是串行操作。
主线程等待所有检查线程执行完成之后再进行执行。
package ThreadStudy;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchTest implements Runnable{
final static CountDownLatch end = new CountDownLatch(10);
final static CountDownLatchTest demo = new CountDownLatchTest();
@Override
public void run() {
try {
//模拟处理事件
Thread.sleep(new Random().nextInt(10)*1000);
System.out.println("check complete");
end.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i=0 ; i<10 ; i++){
service.submit(demo);
}
//等待检查
end.await();
System.out.println("check----end");
service.shutdown();
}
}
out of memory的错误,即使没有,大量的线程回收也会给GC带来很大的压力,延长GC的停顿时间。package ThreadStudy.ThreadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FixedPool {
public static class Fixed implements Runnable{
@Override
public void run() {
System.out.println(System.currentTimeMillis()+"Thread ID:"+
Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Fixed t = new Fixed();
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i=0 ; i<10 ; i++){
executorService.submit(t);
}
}
}
1658471537034Thread ID:11
1658471537034Thread ID:12
1658471537034Thread ID:13
1658471537034Thread ID:14
1658471537034Thread ID:15
1658471538034Thread ID:15
1658471538034Thread ID:14
1658471538034Thread ID:11
1658471538034Thread ID:13
1658471538034Thread ID:12
在main函数的第二行创建了一个固定大小为5的线程池,使用for循环依次向线程池提交了10个任务,此后,线程池就会安排调度这10个任务。看控制台输出,很明显就是前5个和后5个线程的执行时间相差1s,并且两者线程id是相同的,说明这10个任务是分成两个批次执行的,也完全符合一个线程池5个线程的行为。
package ThreadStudy.ThreadPool;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class SheduleThread {
public static void main(String[] args) {
ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
//如果前面的任务没有完成,则后面的调度也不会启动
ses.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis()/1000);
}
},0,2, TimeUnit.SECONDS);
}
}
1658474036
1658474038
1658474040
1658474042
1658474044
可以看到线程的执行时间间隔2秒,这里有一个有意思的问题,假如线程的执行时间超过了调度时间会怎么样。我们把Thread.sleep(1000)修改成Thread.sleep(8000),控制台打印如下:
1658475169
1658475177
1658475185
也就是说,周期太短,那么任务就会在上一个任务结束后立即被调用。
对于核心的线程池,无论newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool(),虽然看起来创建的线程有不同的特点,但内部实现均使用了ThreadPoolExecutor()类
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
参数workQueue指被提交但未执行的任务队列,它是BlockingQueue接口,仅用于存放Runnable接口类型。根据队列功能分类,在ThreadPoolExecutor类的构造函数中,可以使用以下几种BlockingQueue队列:
public ArrayBlockingQueue(int capacity),ArrayBlockingQueue的构造函数得带一个参数,用来表示该队列的最大容量。:当使用有界的任务队列时,若有新的任务需要执行,如果线程池的实际线程数小于 corePoolsize ,则会优先创建新的线程,若大于 corePoolsize ,则会将新任务加入等待队列。若等待队列己满,无法加入,则在总线程数不大于 maximumPoolsize 的前提下,创建新的进程执行任务。若大于 maximumPoolsize ,则执行拒绝策略。可见,有界队列仅当在任务队列装满时,才可能将线程数提升到。 corePoolsize 以上,换言之,除非系统非常繁忙,否则要确保核心线程数维持在 corePoolsize 。JDK内置策略有以下四个:
以上内置的策略均实现了RejectedExecutionHandler接口。若以上的接口无法满足需求,那么完全可以自己实现RejectedExecutionHandler接口。
package ThreadStudy.ThreadPool;
import java.util.concurrent.*;
public class rejectThreadPoolDemo {
public static class MyTesk implements Runnable{
@Override
public void run() {
System.out.println(System.currentTimeMillis()+"Thread ID:"
+Thread.currentThread().getId());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
MyTesk tesk = new MyTesk();
ThreadPoolExecutor executors = new ThreadPoolExecutor(5, 5,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<Runnable>(10),
Executors.defaultThreadFactory(),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + "is discard");
}
});
for (int i = 0; i < Integer.MAX_VALUE; i++) {
executors.submit(tesk);
Thread.sleep(10);
}
}
}
1659081307821Thread ID:13
1659081307821Thread ID:12
1659081307821Thread ID:11
1659081307836Thread ID:14
1659081307852Thread ID:15
java.util.concurrent.FutureTask@66d3c617is discard
java.util.concurrent.FutureTask@63947c6bis discard
Thread newThread(Runnable r);
自定义线程池可以帮我们做很多事情。比如:
package ThreadStudy.ThreadPool;
import java.util.concurrent.*;
public class ThreadFactoryTest {
public static class MyTesk implements Runnable{
@Override
public void run() {
System.out.println(System.currentTimeMillis()+"Thread ID:"
+Thread.currentThread().getId());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
MyTesk tesk = new MyTesk();
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
System.out.println("Create :"+t);
return t;
}
});
for (int i = 0; i < 5; i++) {
es.submit(tesk);
}
Thread.sleep(2000);
}
}
假如我们需要对线程池做一些扩展,比如,监控每个任务的开始和结束时间,或者其他一些自定义的增强功能,那应该怎么做。其实,ThreadPoolExecutor是一个可扩展的线程池,它提供了beforeExecute、afterExecute、afterExecute三个接口来对线程池进行控制。在默认的ThreadPoolExecutor中,提供了空的beforeExecute、afterExecute的实现。
package ThreadStudy.ThreadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ExtThreadPool {
public static class MyTask implements Runnable{
String name;
MyTask(String name){
this.name = name;
}
@Override
public void run() {
System.out.println("正在执行"+"Thread ID :"
+Thread.currentThread().getId());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<Runnable>()){
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("准备执行--" + ((MyTask)r).name);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("执行完成---" + ((MyTask)r).name);
}
@Override
protected void terminated() {
System.out.println("线程池退出---");
}
};
for(int i=0 ; i<5 ; i++){
MyTask task = new MyTask("MyTask"+i);
es.execute(task);
Thread.sleep(10);
}
es.shutdown();
}
}
以上代码中我们可以看出,我们重写了ThreadPoolExecutor的三个方法,用于记录一个任务的开始、结束和整个线程池的退出。在提交完成后,使用shutdown方法关闭线程池。
线程池大小对系统的性能有一定的影响,过大或过小都无法发挥最优的系统性能。一般来说,确定线程池的大小需要考虑CPU数量和内存大小。
Ncpu = CPU数量
Ucpu = CPU的使用率 0 ≤ Ucpu ≤ 1
W/C 等待时间和计算时间的比值
为保持处理器达到预期的使用率,最优的线程池大小为:
Nthread = Ncpu * Ucpu *(1+W/C)
在java中,可以使用以下代码获取CPU数量
Runtime.getRuntime().availableProcessors()
package ThreadStudy.ThreadPool;
import com.sun.org.apache.xpath.internal.operations.Div;
import java.sql.Time;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class DivTask implements Runnable {
int a,b;
DivTask(int a , int b){
this.a = a;
this.b = b;
}
@Override
public void run() {
double result = a/b;
System.out.println(result);
}
public static void main(String[] args) {
ThreadPoolExecutor pools = new ThreadPoolExecutor(0,Integer.MAX_VALUE
, 0L,TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<Runnable>());
for (int i = 0; i < 5; i++) {
DivTask div = new DivTask(100,i);
pools.submit(div);
}
}
}
输出:
100.0
50.0
33.0
25.0
//弃用
pools.submit(Thread)
//使用
pools.execute(Thread)
Future re= pools.submit(Thread);
re.get();
package ThreadStudy.ThreadPool;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
public void execute(Runnable command) {
super.execute(wrap(command,clientTrace()));
}
@Override
public Future<?> submit(Runnable task) {
return super.submit(wrap(task,clientTrace()));
}
private Exception clientTrace(){
return new Exception("Client stack trace");
}
private Runnable wrap(final Runnable task , final Exception clientStack){
return new Runnable() {
@Override
public void run() {
try {
task.run();
}catch (Exception e){
clientStack.printStackTrace();
throw e;
}
}
};
}
}
public static void main(String[] args) {
ThreadPoolExecutor pools = new TraceThreadPoolExecutor(0,Integer.MAX_VALUE
, 0L,TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<Runnable>());
for (int i = 0; i < 5; i++) {
DivTask div = new DivTask(100,i);
pools.execute(div);
}
}
java.lang.Exception: Client stack trace
at ThreadStudy.ThreadPool.TraceThreadPoolExecutor.clientTrace(TraceThreadPoolExecutor.java:24)
at ThreadStudy.ThreadPool.TraceThreadPoolExecutor.execute(TraceThreadPoolExecutor.java:15)
at ThreadStudy.ThreadPool.DivTask.main(DivTask.java:33)
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
at ThreadStudy.ThreadPool.DivTask.run(DivTask.java:21)
at ThreadStudy.ThreadPool.TraceThreadPoolExecutor$1.run(TraceThreadPoolExecutor.java:33)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
100.0
50.0
33.0
25.0
Process finished with exit code 0
分而治之简单来说就是,你要处理1000条数据,但是你不具备处理1000条数据的能力,那么你可以只处理其中的10条,然后分阶段处理100次,再把这100次的结果合成,最终结果就是对1000个数据处理后的结果。
除了JDK内置线程池外,Guava对线程池也进行了一定的扩展,主要体现在MoreExecutors工具类中。
通过以下方式产生的HashMap就是线程安全的。
Collections.synchronizedMap(new HashMap<>());
通过mutex进行互斥操作。
SynchronizedMap(Map<K,V> m) {
this.m = Objects.requireNonNull(m);
mutex = this;
}
虽然该类可以满足线程安全的要求,但是对Map的读取和写入,都需要获取mutex锁,这会导致所有对Map的操作都会进入等待状态。而ConcurrentHashMap对并发进行了性能优化,因此更适合多线程场景。
ArrayList和Vector都使用数组实现的,ArrayList 不是线程安全,Vector是线程安全的。LinkedList使用链表实现了List,但也不是线程安全,参考Collections.synchronizedMap(),我们可以使用Collections.synchronizedList()来包装List
Collections.synchronizedList(new ArrayList<>());
CopyOnWriteArrayList的读操作之间不需要加锁,并且写也不会阻塞读操作。因为它在进行写操作的时候,创建了一个新的数组进行写操作,写完之后再用这个修改过的副本去覆盖原来的数据。public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
private E get(Object[] a, int index) {
return (E) a[index];
}
/**
* {@inheritDoc}
*
* @throws IndexOutOfBoundsException {@inheritDoc}
*/
public E get(int index) {
return get(getArray(), index);
}
如何进行多线程中的数据共享呢?比如,线程A想给线程B发一条消息,用什么方式更合理呢?
当我们希望线程A 通知线程B,但又不希望线程A 知道线程B的存在。这样,如果将来进行重构或者升级,我们完全不用修改A线程,直接把线程B升级成线程C,保证系统的平滑过渡。而这中间的"箱子"就可以用BlockingQueue来实现。
BlockingQueue是一个接口,以下是它的实现类。

ArrayBlockingQueue是用数组实现的,它适合做有界队列,因为队列的最大容量需要在队列初始化的时候指定(数组的动态扩展不太方便)
LinkedBlockingQueue使用链表实现的,它适合做无界队列,因为队列内部的元素可以动态增加,它不会因为初始容量很大,而占据一大半的内存。
Blocking是阻塞的意思,当服务线程(指不断获取队列中的消息,进行处理的线程)处理完队列中的所有消息的时候,它如何知道下一条消息何时到来呢?
一种最简单的方式是线程按照一定时间间隔不停循环和监控这个队列,这是一种可行的方式,但造成了不必要的资源浪费,而且时间间隔也难以确定。而BlokingQueue解决了这个问题,它会让服务线程在队列为空时进行等待,当有新的消息进入队列后,自动将线程唤醒。
| 方法名 | 作用 |
|---|---|
| offer() | 队列满了返回false,否则进行正常的入队操作 |
| put() | 队列满了一直等待,否则入队 |
| poll() | 队列为空返回null |
| take() | 队列为空一直等待元素入队 |
可以用来快速查找的数据结构,有点类似于平衡树,他们都可以对元素进行快速查找。但对平衡树的插入和删除操作往往需要对平衡树进行一次整体的重排,而对跳表的插入和删除只需要对整个数据结构的局部进行操作即可。在高并发的情况下,你需要一个全局锁来保证整个平衡树的安全。而对于跳表来说,只需要使用部分锁就行。

底层链表维护了所有的数据,每上面一层都是下面一层的子集,一个元素插入哪层是完全随机的。

使用跳表实现Map和使用哈希算法实现Map的另一个不同之处是:哈希并不会保存元素的顺序,而跳表会。实现这一数据结构的是ConcurrentSkipListMap
跳表的内部实现由几个关键的数据结构组成。

使用CAS

Index,内部包装了Node,同时增加了向下的引用和向右的引用。

HeadIndex,表示链表的头一个Index,它继承Index

JMH(Java Microbenchmark Harness)