第一节 synchronized关键字详解-偏向锁、轻量级锁、偏向锁、重量级锁、自旋、锁粗化、锁消除
第二节 AQS抽象队列同步器原理详解
第三节 AQS之ReentrantLock特性和源码分析及CAS和LockSupport的核心原理
BlockingQueue——是JUC包提供用于解决并发生产者与消费者问题的类,具有在任意时刻只有一个线程可以进行take或者put方法的特性,即执行take和put方法时阻塞,还提供了超时机制,常用于解耦,在很多的生产与消费场景中可以看见,例如需要在一个系统内实现多任务并发执行,可将任务放入阻塞队列存放,多线程进行消费执行。
下面是以一个工厂生产包子,然后进行包装消费的案例
包子类
package com.xj.queue;
//包子类
public class Bun {
String id;//包子编号
int weight;//包子重量
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public int getWeight() {
return weight;
}
public void setWeight(int weight) {
this.weight = weight;
}
}
包子制作工人线程
package com.xj.queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
//包子制作者
public class BunProducer implements Runnable{
static Logger log = LoggerFactory.getLogger(BunProducer.class);
private BlockingQueue<Bun> queue;
List<Bun> bucket;
public BunProducer(BlockingQueue<Bun> queue,List<Bun> bucket) {
this.queue = queue;
this.bucket = bucket;
}
@Override
public void run() {
while (true){
try{
if(BunStarter.producer_counter.get() >= BunStarter.bunNumber){
log.info("制作工人:{},本次需要生产的包子数量达到{},将停止生产",Thread.currentThread().getName(),BunStarter.bunNumber);
break;
}
Bun bun = new Bun();
bun.setId(UUID.randomUUID().toString());
bun.setWeight(new Random().nextInt(10) + 95);//重量在:95-104
if(bun.getWeight() >= BunStarter.qualifiedWeight){
this.queue.put(bun);
int count = BunStarter.producer_counter.incrementAndGet();
log.info("制作工人:{},包子ID:{},重量:{}合格,投入生产线队列进行包装,目前生产总额为{}",Thread.currentThread().getName(),bun.getId(),bun.getWeight(),count);
}else {
log.info("制作工人:{},包子ID:{},重量:{}不合格,将放入不合格桶",Thread.currentThread().getName(),bun.getId(),bun.getWeight());
bucket.add(bun);
}
}catch (InterruptedException e){
log.info("制作工人:{},被中断,将停止包子制作",Thread.currentThread().getName());
break;
}
}
}
}
包子消费工人线程
package com.xj.queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
//包子包装者
public class BunConsumer implements Runnable {
static Logger log = LoggerFactory.getLogger(BunConsumer.class);
private BlockingQueue<Bun> queue;
public BunConsumer(BlockingQueue<Bun> queue) {
this.queue = queue;
}
@Override
public void run() {
while(true){
try{
if(BunStarter.consumer_counter.get() >= BunStarter.bunNumber){
log.info("包装工人:{},本次需要包装的包子数量达到{},将停止包装",Thread.currentThread().getName(),BunStarter.bunNumber);
break;
}
//Bun bun = queue.take();//会一直阻塞
Bun bun = queue.poll(10, TimeUnit.SECONDS);//阻塞10s
if(bun != null){
log.info("包装工人:{},包子ID:{},开始进行包装,目前包装总额为{}",Thread.currentThread().getName(),bun.getId(),BunStarter.consumer_counter.incrementAndGet());
}
}catch (InterruptedException e) {
log.info("包装工人:{},被中断,将停止包子包装",Thread.currentThread().getName());
break;
}
}
}
}
包子生产条线启动器
package com.xj.queue;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
//包子生产条线启动器
public class BunStarter {
//包子制作工人数目
public static final int bunProducerNum = 10;
//包子包装工人数目
public static final int bunConsumerNum = 10;
//需要生产进行包装的包子总个数
public static final int bunNumber = 5000;
//定义包子重量不足100g为不合格产品
public static final int qualifiedWeight = 100;
//生产包子计数器,原子操作
public static AtomicInteger producer_counter = new AtomicInteger();
//消费包子计数器,原子操作
public static AtomicInteger consumer_counter = new AtomicInteger();
public static void main(String[] args) {
//储存包子的队列大小容量为100(类似流水线的长度)
BlockingQueue<Bun> bunQueue = new ArrayBlockingQueue<Bun>(100);
//存放所有不合格包子产品,保证线程安全
List<Bun> bucket = Collections.synchronizedList(new ArrayList<Bun>());
//多个包子制作工人
for (int i = 1; i <= bunProducerNum; i++) {
new Thread(new BunProducer(bunQueue,bucket)).start();
}
//多个包子包装工人
for (int i = 1; i <= bunConsumerNum; i++) {
new Thread(new BunConsumer(bunQueue)).start();
}
}
}
结果打印:

利用Lock锁的Condition通知机制进行阻塞控制,一把ReentrantLock锁、两个条件(notEmpty、notFull)。
构造阻塞队列
new ArrayBlockingQueue<Bun>(100);
使用的是ReentrantLock非公平锁以及条件Condition
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
//队列数组
this.items = new Object[capacity];
//非公平锁
lock = new ReentrantLock(fair); //公平,非公平
//两个条件队列notEmpty和notFull
notEmpty = lock.newCondition();//队列不空
notFull = lock.newCondition();//队列不满
}
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
//加锁,如果线程中断抛出异常
lock.lockInterruptibly();
try {
//循环判断队列是否满了
//为什么要使用while,而不是if判断呢?是为了防止虚假唤醒
/*if只会判断一次,如果条件满足,也就阻塞一次;使用while可以在唤醒之后继续判断条件
*/
while (count == items.length)
//在notFull条件上等待,并释放锁
notFull.await();
// 入队
enqueue(e);
} finally {
//解锁
lock.unlock();
}
}
enqueue(e)方法操作入队逻辑
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
//放入数组
items[putIndex] = x;
//环形数组:putIndex指针到数组尽头了,返回头部
if (++putIndex == items.length)
putIndex = 0;
count++;
//唤醒notEmpty条件队列
notEmpty.signal();
}
signal方法将条件等待队列中
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
//判断头节点是否为空
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);//从条件队列转移到同步队列成功,退出循环,反之则继续调用transferForSignal方法
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
//将node节点的状态修改为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
//入同步等待队列
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
await方法在接口Condition定义,在AQS里实现
public final void await() throws InterruptedException {
//如果线程被中断,抛出InterruptedException异常
if (Thread.interrupted())
throw new InterruptedException();
//将线程加入条件等待队列
Node node = addConditionWaiter();
//释放当前锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//循环判断node节点是否在同步等待队列
while (!isOnSyncQueue(node)) {
//阻塞
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//节点线程再次去申请锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
addConditionWaiter方法添加到条件等待队列
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
//t指向尾结点
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
//如果尾结点不为空并且waitStatus信号量不是condition
if (t != null && t.waitStatus != Node.CONDITION) {
//清除掉不符合条件的节点
unlinkCancelledWaiters();
//t继续指向尾结点
t = lastWaiter;
}
//构造条件节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//尾结点t为空,头节点firstWaiter指针指向当前节点
if (t == null)
// 初始化队列
firstWaiter = node;
else
//尾结点t的下一个节点nextWaiter指针指向当前节点
t.nextWaiter = node;
//尾结点指针lastWaiter指向当前节点
lastWaiter = node;
return node;
}
unlinkCancelledWaiters方法:遍历等待队列,将非CONDITION状态到的节点移除
private void unlinkCancelledWaiters() {
//t指针指向头节点
Node t = firstWaiter;
//记录遍历进度节点的指针
Node trail = null;
//如果头节点不为空
while (t != null) {
//遍历等待队列,保存当前t的下一个节点对象
Node next = t.nextWaiter;
//如果当前t节点的状态不是为CONDITION
if (t.waitStatus != Node.CONDITION) {
//将t节点移除队列,t节点的nextWaiter指针指向null
t.nextWaiter = null;
//第一次遍历,进度节点为null
if (trail == null)
//头节点指向被移除节点的下一个节点
firstWaiter = next;
else
//进度节点不为null,则指向下一个节点
trail.nextWaiter = next;
//如果next为空,表明队列遍历完成,将尾指针指向进度节点
if (next == null)
lastWaiter = trail;
}
else //如果当前t节点的状态是CONDITION
//保存进度节点
trail = t;
//指针t指向下一个节点
t = next;
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//加锁,如果线程中断抛出异常
lock.lockInterruptibly();
try {
//如果队列为空
while (count == 0)
notEmpty.await();//在notEmpty条件上等待,并释放锁
//出队
return dequeue();
} finally {
lock.unlock();// 解锁,唤醒其它线程
}
}
dequeue出队方法
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex]; //取出takeIndex位置的元素
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0; // 环形数组,takeIndex 指针到数组尽头了,返回头部
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();//notFull条件队列转同步队列
return x;
}
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
//无界队列
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
* Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity} is not greater
* than zero
*/
//有界队列
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
//容量,指定容量就是有界队列
private final int capacity;
//元素数量
private final AtomicInteger count = new AtomicInteger();
//头节点
transient Node<E> head;
//尾节点
private transient Node<E> last;
//take锁,锁分离,提高效率
private final ReentrantLock takeLock = new ReentrantLock();
//notEmpty条件
//当队列无元素时,take锁会阻塞在notEmpty条件上,等待其它线程唤醒
private final Condition notEmpty = takeLock.newCondition();
//put锁,锁分离,提高效率
private final ReentrantLock putLock = new ReentrantLock();
//notFull条件
//当队列满了时,put锁会会阻塞在notFull上,等待其它线程唤醒
private final Condition notFull = putLock.newCondition();
//单链表Node元素
static class Node<E> {
E item; //存储元素
Node<E> next; //后继节点 单链表结构
Node(E x) { item = x; }
}
public void put(E e) throws InterruptedException {
//元素为null,抛出空指针异常
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
//构建Node节点
Node<E> node = new Node<E>(e);
//put操作锁
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//加put锁
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
//判断元数个数是否满了
while (count.get() == capacity) {
notFull.await();//释放锁,当前线程加入条件等待队列
}
//链表入队
enqueue(node);
//获取元数个数后,原子加1
c = count.getAndIncrement();
//如果少于容量
if (c + 1 < capacity)
//可唤醒生产者线程(唤醒阻塞在notFull条件上的线程),从条件等待队列转移到同步等待队列,继续生产入队,等待unlock释放锁
notFull.signal();
} finally {
//put锁释放,真正唤醒生产者线程
putLock.unlock();
}
//如果原队列长度为0,现在加了一个元素后立即唤醒阻塞在notEmpty上的线程
if (c == 0)
signalNotEmpty();
}
//存节点从last进
private void enqueue(Node<E> node) {
//尾节点的下一个节点指向node节点,尾节点last指向入队元素
last = last.next = node;
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();// 加take锁
try {
// notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程
notEmpty.signal();
} finally {
takeLock.unlock(); // 真正唤醒消费者线程
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//加take锁
takeLock.lockInterruptibly();
try {
//队列元素个数为0
while (count.get() == 0) {
//在notEmpty条件上阻塞并释放锁,加入到条件等待队列
notEmpty.await();
}
//出队
x = dequeue();
//获取元数个数后,原子减1
c = count.getAndDecrement();
//如果取之前队列元素个数大于1
if (c > 1)
//notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程,原因与入队同理
notEmpty.signal();
} finally {
//真正唤醒消费者线程
takeLock.unlock();
}
// 如果取之前队列元素个数等于容量(表示已满),则唤醒阻塞在notFull的线程
if (c == capacity)
signalNotFull();
return x;
}
//取节点从head出
private E dequeue() {
//head节点不存储item元素值
//删除head,并把head下一个节点作为新的head节点,并把其值置空,返回原来的值
Node<E> h = head;//头节点
Node<E> first = h.next;//头节点的后继节点为出队节点
h.next = h; // 方便GC,h.next指向空
head = first;//头节点指向first节点
E x = first.item;//返回first节点item值
first.item = null;//清空first节点的item值
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();// notFull条件队列转同步队列,准备唤醒阻塞在notFull上的线程
} finally {
putLock.unlock(); // 解锁,这才会真正的唤醒生产者线程
}
package com.xj.queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
public class SynchronousQueueExample {
public static void main(String[] args) throws InterruptedException {
//默认false为非公平模式,true为公平模式
BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<Integer>();
new Thread(new Runnable() {
@Override
public void run() {
try{
Integer value = synchronousQueue.take();
System.out.println(Thread.currentThread().getName()+":"+value);
}catch (InterruptedException e){
e.printStackTrace();
}
}
},"consumer1").start();
new Thread(new Runnable() {
@Override
public void run() {
try{
Integer value = synchronousQueue.take();
System.out.println(Thread.currentThread().getName()+":"+value);
}catch (InterruptedException e){
e.printStackTrace();
}
}
},"consumer2").start();
Thread.sleep(1000);
new Thread(new Runnable() {
@Override
public void run() {
try{
synchronousQueue.put(1);
System.out.println(Thread.currentThread().getName()+":"+1);
}catch (InterruptedException e){
e.printStackTrace();
}
}
},"product1").start();
new Thread(new Runnable() {
@Override
public void run() {
try{
synchronousQueue.put(2);
System.out.println(Thread.currentThread().getName()+":"+2);
}catch (InterruptedException e){
e.printStackTrace();
}
}
},"product2").start();
}
}
结果打印,默认采用的非公平模式
product1:1
consumer2:1
product2:2
consumer1:2
Process finished with exit code 0
非公平模式基于链表队列实现
BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<Integer>();
公平模式基于栈结构实现
BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<Integer>(true);
默认构造方法采用非公平模式,TransferQueue和TransferStack都继承Transferer抽象类并重写了transfer方法
/**
* Creates a {@code SynchronousQueue} with nonfair access policy.
*/
public SynchronousQueue() {
this(false);
}
/**
* Creates a {@code SynchronousQueue} with the specified fairness policy.
*
* @param fair if true, waiting threads contend in FIFO order for
* access; otherwise the order is unspecified.
*/
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
Transferer抽象类
/**
* Shared internal API for dual stacks and queues.
*/
abstract static class Transferer<E> {
/**
* Performs a put or take.
*
* @param e if non-null, the item to be handed to a consumer;
* if null, requests that transfer return an item
* offered by producer.
* @param timed if this operation should timeout
* @param nanos the timeout, in nanoseconds
* @return if non-null, the item provided or received; if null,
* the operation failed due to timeout or interrupt --
* the caller can distinguish which of these occurred
* by checking Thread.interrupted.
*/
abstract E transfer(E e, boolean timed, long nanos);
}
public void put(E e) throws InterruptedException {
//put元素为空,抛出空指针异常
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
//如果判断成立,说明是中断唤醒的,则设置中断标记并抛出中断异常
Thread.interrupted();
throw new InterruptedException();
}
}
public E take() throws InterruptedException {
//
E e = transferer.transfer(null, false, 0);
//take元素不为空返回
if (e != null)
return e;
//如果take元素为null,说明是中断唤醒的,则设置中断标记并抛出中断异常
Thread.interrupted();
throw new InterruptedException();
}
不论出队还是入队操作都调用了transfer方法
//抽象方法,第一个参数为put元素值,为null说明是take操作
abstract E transfer(E e, boolean timed, long nanos);
采用链表数据结构,QNode节点结构如下:
static final class QNode {
//下一个节点
volatile QNode next; // next node in queue
//节点元素
volatile Object item; // CAS'ed to or from null
//节点所属的线程
volatile Thread waiter; // to control park/unpark
//put和take的标记,put为true,take为false
final boolean isData;
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
}
TransferQueue构造方法
TransferQueue() {
//构造空节点,并设置isData为false,表示为take的节点
QNode h = new QNode(null, false); // initialize to dummy node.
//头节点和尾节点都指向了当前空节点
head = h;
tail = h;
}
重写transfer方法
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
//判断当前e元素是否为空,e为空说明是take返回false,e不为空为put返回true
boolean isData = (e != null);
//自旋,保证CAS执行成功
for (;;) {
//指向尾节点
QNode t = tail;
//指向头节点
QNode h = head;
// 头尾为空,说明没有被初始化,继续下次循环
if (t == null || h == null) // saw uninitialized value
continue; // spin
//头节点和尾节点相等(队列中只有一个节点或者是只有一个空节点)或者模式相同(队尾节点的模式匹配)
if (h == t || t.isData == isData) { // empty or same-mode
//获取尾节点的下一个节点
QNode tn = t.next;
//尾结点不一致,继续下次循环(由于其他节点入队了,导致读不一致)
if (t != tail) // inconsistent read
continue;
//尾节点的后继节点不为空
if (tn != null) { // lagging tail
//CAS修改尾节点
advanceTail(t, tn);
//下次继续循环
continue;
}
if (timed && nanos <= 0) // can't wait
return null;
//当前的节点不存在,构建QNode新节点
if (s == null)
s = new QNode(e, isData);
//CAS修改节点s的next指针
if (!t.casNext(null, s)) // failed to link in
//修改失败,则下次继续循环
continue;
//CAS将当前节点s修改为尾节点
advanceTail(t, s); // swing tail and wait
//自旋阻塞
Object x = awaitFulfill(s, e, timed, nanos);
//节点被取消、中断或超时
if (x == s) { // wait was cancelled
//清除当前s节点
clean(t, s);
//返回null
return null;
}
//没有被取消、中断或超时
if (!s.isOffList()) { // not already unlinked
// 如果是头节点,从队列中移除
advanceHead(t, s); // unlink if head
//当前元素不是空的
if (x != null) // and forget fields
//将当前节点的item执行当前节点
s.item = s;
//当前节点的持有线程变为null
s.waiter = null;
}
//返回处理后的元素,take操作传入null返回数据,put操作传入数据e返回null
return (x != null) ? (E)x : e;
//模式不同的逻辑
} else { // complementary-mode
//获取头节点的后继节点
QNode m = h.next; // node to fulfill
//尾节点不同||头节点的后继为空||头节点不同,则下次继续循环
if (t != tail || m == null || h != head)
continue; // inconsistent read
//获取头节点的后继节点元素
Object x = m.item;
if (isData == (x != null) || // m already fulfilled 节点位置有值
x == m || // m cancelled 节点被取消、中断或超时
!m.casItem(x, e)) { // lost CAS CAS修改item(无值修改为有值,有值修改为无值)
advanceHead(h, m); // dequeue and retry
continue;
}
advanceHead(h, m); // successfully fulfilled
//解除线程驻留,即唤醒线程继续工作
LockSupport.unpark(m.waiter);
//返回处理后的元素,take传入null返回数据e,put传入数据返回null
return (x != null) ? (E)x : e;
}
}
}
awaitFulfill方法,自旋或阻塞线程,直到满足s.item != e(传入的数据不是当前数据)
/**
* Spins/blocks until node s is fulfilled.
*
* @param s the waiting node 等待节点
* @param e the comparison value for checking match 元素
* @param timed true if timed wait 超时等待
* @param nanos timeout value 超时值
* @return matched item, or s if cancelled
*/
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
//计算超时时间,true则计算,false则就是0
final long deadline = timed ? System.nanoTime() + nanos : 0L;
//当前线程对象
Thread w = Thread.currentThread();
//计算需要自旋的次数
//如果头节点的后继节点就是s当前节点,根据cpu核心线程数计算相关的次数(例如32*16=512次数)
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
//自旋
for (;;) {
//判断中断标志,不会清除线程的状态标记
if (w.isInterrupted())
// 尝试取消当前节点
s.tryCancel(e);
// 获取当前节点的元素
Object x = s.item;
// 元素不相同,直接返回当前的元素(模式不匹配之后会将item由null变为e,由e变成null,unpark之后判断会成立返回x)
if (x != e)
return x;
// 如果设置了超时时间,判断超时时间
if (timed) {
// 计算剩余时间
nanos = deadline - System.nanoTime();
// 剩余时间小于0
if (nanos <= 0L) {
// 尝试取消当前节点
s.tryCancel(e);
// 继续下次循环
continue;
}
}
//自旋的次数大于0,执行次数自减操作
//自旋次数用完之后,设置waiter为当前线程
//其次判断没有设置超时,则直接进行阻塞park
if (spins > 0)
//次数减1
--spins;
else if (s.waiter == null)
//自旋次数用完之后,设置一下s的等待线程为当前线程
s.waiter = w;
else if (!timed)
//没有设置超时,则直接进行阻塞
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
// 超时时间大于1000L的时候,使用判断时间的阻塞(时间nanos大于0才会阻塞)
LockSupport.parkNanos(this, nanos);
}
}
void tryCancel(Object cmp) {
// CAS将传入的变量设置为this
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
后进先出的栈结构,采用链表实现
static final class SNode {
volatile SNode next; // next node in stack 后继节点
volatile SNode match; // the node matched to this 与之匹配成功的节点
volatile Thread waiter; // to control park/unpark 节点所属线程
Object item; // data; or null for REQUESTs 节点元素
int mode; //节点模式,DATA(put)和REQUEST(take)两种模式
// Note: item and mode fields don't need to be volatile
// since they are always written before, and read after,
// other volatile/atomic operations.
SNode(Object item) {
this.item = item;
}
static SNode snode(SNode s, Object e, SNode next, int mode) {
//s节点为空构建
if (s == null) s = new SNode(e);
//指定s节点的后继和模式
s.mode = mode;
s.next = next;
return s;
}
}
重写transfer方法
/**
* Puts or takes an item.
*/
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
/*
* Basic algorithm is to loop trying one of three actions:
*
* 1. If apparently empty or already containing nodes of same
* mode, try to push node on stack and wait for a match,
* returning it, or null if cancelled.
*
* 2. If apparently containing node of complementary mode,
* try to push a fulfilling node on to stack, match
* with corresponding waiting node, pop both from
* stack, and return matched item. The matching or
* unlinking might not actually be necessary because of
* other threads performing action 3:
*
* 3. If top of stack already holds another fulfilling node,
* help it out by doing its match and/or pop
* operations, and then continue. The code for helping
* is essentially the same as for fulfilling, except
* that it doesn't return the item.
*/
SNode s = null; // constructed/reused as needed
//e为空REQUEST模式(take),e不为空DATA模式(put)
int mode = (e == null) ? REQUEST : DATA;
//自旋
for (;;) {
//h指向头节点
SNode h = head;
//头节点为null说明栈为空 || 模式一样 压栈
if (h == null || h.mode == mode) { // empty or same-mode
//设置了超时时间并且时间小于等于0
if (timed && nanos <= 0) { // can't wait
//头节点不为空并且头节点被取消
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
//反之返回null
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {//没有设置超时,将s节点作为头节点,头节点作为s节点的后继,即压栈操作
//压栈成功,则自旋并阻塞当前线程,等待解除阻塞返回匹配成功的节点
SNode m = awaitFulfill(s, timed, nanos);
//解除阻塞后,如果
if (m == s) { // wait was cancelled
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item);
}
} else if (!isFulfilling(h.mode)) { // try to fulfill 头节点模式mode,判断是否模式是否为FULFILLING,不为FULFILLING往下执行
if (h.isCancelled()) // already cancelled 头节点被取消
casHead(h, h.next); // pop and retry 将头节点的下一个节点作为新的头节点
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {//没被取消,将模式为2的节点压栈,并作为新的头节点
//自旋
for (;;) { // loop until matched or waiters disappear
//s节点当前模式为2,s.next为匹配节点
SNode m = s.next; // m is s's match
//m的节点为空
if (m == null) { // all waiters are gone 所有等待线程都执行完了
casHead(s, null); // pop fulfill node 如果s为头节点则重置s节点为null
s = null; // use new node next time s节点指向空
break; // restart main loop 退出for循环
}
//m节点的下一个
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
s.casNext(m, mn); // help unlink
}
}
} else { // help a fulfiller 头节点模式是FULFILLING
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}
//m的取值可能为0|1|2,FULFILLING为2,要使判断成立,只能取2
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
//尝试匹配节点
boolean tryMatch(SNode s) {
//match为null && 修改SNode的match成员变量,这里的this指调此方法的节点对象,从null修改为入参s节点
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
//获取调用此方法的对象的所属线程
Thread w = waiter;
if (w != null) { // waiters need at most one unpark 所属线程不为空
waiter = null; //重置
LockSupport.unpark(w); //唤醒
}
return true; //匹配成功
}
return match == s;
}
awaitFulfill方法自旋或阻塞线程,直到匹配成功
/**
* Spins/blocks until node s is matched by a fulfill operation.
*
* @param s the waiting node
* @param timed true if timed wait
* @param nanos timeout value
* @return matched node, or s if cancelled
*/
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
//计算截止时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
//获取当前线程对象
Thread w = Thread.currentThread();
//计算自旋次数,根据机器CPU核心线程数进行计算
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
//自旋
for (;;) {
//判断中断标记不清除
if (w.isInterrupted())
//尝试取消节点
s.tryCancel();
//获取匹配节点
SNode m = s.match;
//判断匹配节点不为null则返回
if (m != null)
return m;
//如果设置了超时,计算剩余时间
if (timed) {
nanos = deadline - System.nanoTime();
//剩余时间少于等于0,尝试取消节点
if (nanos <= 0L) {
s.tryCancel();
//下次继续循环
continue;
}
}
//判断自旋次数是否还有剩余
if (spins > 0)
//
spins = shouldSpin(s) ? (spins-1) : 0;
else if (s.waiter == null)
//自旋次数用完之后,设置所属线程
s.waiter = w; // establish waiter so can park next iter
else if (!timed)
//没有设置超时则park阻塞
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
//超时时间大于1000L,使用判断时间的阻塞(时间nanos大于0才会阻塞)
LockSupport.parkNanos(this, nanos);
}
}
boolean shouldSpin(SNode s) {
//头节点
SNode h = head;
//头节点跟当前节点相同 || 头节点为null || 如果m的位设置满足则返回true
return (h == s || h == null || isFulfilling(h.mode));
}