抽象同步队列AQS源码学习
1、AQS类结构剖析
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node(); // 用来标记该线程是获取共享资源时被阻塞挂起后放入到AQS队列的
static final Node EXCLUSIVE = null; // 用来标记线程是获取独占资源时被挂起后放入AQS队列的
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;// 线程被取消了
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1; // 线程需要被唤醒
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;// 线程在条件队列里面等待
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;// 释放共享资源时需要通知其他节点。
volatile int waitStatus; // 记录当前线程等待状态, 可以为CANCELLED,SIGNAL,CONDITION,PROPAGATE
volatile Node prev; // 记录前驱节点
volatile Node next; // 记录后继节点
volatile Thread thread;// 用来存放进入AQS队列里面的线程
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
AQS类结构如上图所示,AQS是一个FIFO的双向队列,队列元素的类型为Node,如上代码所示。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 7373984972572414691L;
protected AbstractQueuedSynchronizer() { }
private transient volatile Node head;// 队列的头节点
private transient volatile Node tail; // 队列的尾节点
private volatile int state; // 状态信息
protected final int getState() { // 获取状态信息
return state;
}
protected final void setState(int newState) { // 修改状态信息
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
....
}
在AQS中维持了一个单一的状态信息state,可以通过getState、setState、compareAndSetSetState函数修改其值。对于reentrantlock的实现来说,state可以用来表示当前线程获取锁的可重入次数;对于读写锁reentrantreadwritelock来说,state的高16位表示读状态, 也就是获取该读锁的次数, 低16位表示获取到写锁的线程的可重入次数。
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter; // 条件队列的头节点
/** Last node of condition queue. */
private transient Node lastWaiter; // 条件队列的尾节点
public ConditionObject() { }
....
}
AQS还有个内部类ConditionObject,用来结合锁实现线程同步。ConditionObject可以直接访问AQS对象内部的变量,比如state状态值和AQS队列。ConditionObject是条件变量,每个条件变量对应一个条件队列(单向链表队列),其用来存放调用条件变量的await方法后被阻塞的线程,如上代码所示,这个条件队列的头、尾元素分别为firstWaiter和lastWaiter。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
当一个线程调用acquire方法获取独占资源时,会首先使用tryAcquire方法尝试获取资源,具体是设置状态变量state的值,成功则直接返回,失败则将当前线程封装为类型为Node.EXCLUSIVE的Node节点后插入到AQS阻塞队列的尾部,并调用LockSupport.park(this)方法挂起自己。
-
addWaiter
如果tryAcquire
尝试获取锁失败,则调用addWaiter
方法将当前线程添加到一个等待队列
中,等待后续处理; -
acquireQueued
方法处理加入到队列中的节点(Node),通过自旋
去尝试获取锁,会根据前驱节点的waitStatus的情况将线程挂起
或者取消
。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
当一个线程调用release方法时,会尝试使用tryRelease方法操作释放资源,这里是设置状态变量state的值,然后调用LockSupport.unpark方法激活AQS队列里面被阻塞的一个线程。被激活的线程则使用tryAcquire尝试,看当前状态变量state的值是否能满足自己的需要,满足则该线程被激活,然后继续向下运行,否则还是会被放入AQS队列并被挂起。
但是AQS并没有提供可用的tryAcquire和tryRelease方法,它们需要由具体的子类去实现,子类在实现的时候要根据具体场景使用CAS算法尝试修改state状态值,成功则返回true,否则返回false。
比如reentrantlock,定义当status为0时表示锁空闲,为1时表示锁已经被占用。在重写tryAcquire时,在内部需要使用CAS算法查看当前state是否为0,如果为0则使用CAS设置为1,并设置当前锁的持有者为当前线程,然后返回true,如果CAS失败则返回false。
AQS的入队操作,当一个线程获取锁失败后,该线程会被转换为Node节点,然后就会使用enq方法将该节点插入到AQS的阻塞队列。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
2、AQS条件变量的支持
synchronized内置锁实现线程间同步可以通过notify和wait来实现,条件变量的signal和await方法也是用来配合锁(使用AQS实现的锁)实现线程间同步的基础设施。
它们的不同在于synchronized同时只能与一个共享变量的notify或wait方法实现同步,而AQS的一个锁可以对应多个条件变量。
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.reentrantlock;
/**
* @author Wenbo
* @version 1.0
* @program
* @description
* @date 2022/5/25 15:52
*/
public class AQS {
public static void main(String[] args) {
reentrantlock lock = new reentrantlock();
Condition condition = lock.newCondition();
lock.lock();
try{
System.out.println("begin wait");
condition.await();
System.out.println("end wait");
}catch (Exception e){
e.printstacktrace();
}finally {
lock.unlock();
}
lock.lock();
try{
System.out.println("begin signal");
condition.signal();
System.out.println("end signal");
}catch (Exception e){
e.printstacktrace();
}finally {
lock.unlock();
}
}
}
代码中Lock对象等价于synchronized加上共享变量,调用lock.lock方法就相当于进入了synchronized块,调用lock.unlock方法就相当于退出synchronized快。调用条件变量的await方法就相当于调用共享变量的wait方法,调用条件变量的signal方法就相当于调用共享变量的notify()方法。调用条件变量的signalall就相当于调用共享变量的notifyall()方法。
lock.newCondition的作用其实是new了一个在AQS内部声明的ConditionObject对象,ConditionObject是AQS的内部类,可以访问AQS内部的变量和方法。在每个条件变量内部都维护了一个条件队列,用来存放调用条件变量的await()方法时被阻塞的线程。这个条件队列和AQS队列不是一回事。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//创建新的node节点,并插入到条件队列末尾
Node node = addConditionWaiter();
// 释放当前线程获取的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//调用park方法阻塞挂起当前线程
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
上面代码中,当线程调用条件变量的await()方法时,在内部会构造一个类型为Node.CONDITION的node节点,然后将该节点插入条件队列末尾,之后当前线程会释放获取的锁(也就是会操作锁对应的state变量的值),并被阻塞挂起。这时候如果有其他线程调用lock.lock尝试获取锁,就会有一个线程获取到锁,如果获取到锁的线程调用了条件变量的await方法,则该线程也会被放入条件变量的阻塞队列,随后释放获取到的锁,在await方法处阻塞。
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建一个类型为Node.CONDITION的节点。
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 向单向条件队列尾部插入一个元素
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
在下面代码中,当另外一个线程调用条件变量的signal方法时,在内部会把条件队列里面队头的一个线程节点从条件队列里面移除并放入AQS的阻塞队列里面,然后激活这个线程。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
3、总结
当多个线程同时调用lock.lock()方法获取锁时,只有一个线程获取到了锁,其他线程会被转换为Node节点插入到lock锁对应的AQS阻塞队列里面,并做自旋CAS尝试获取锁。
如果获取到锁的线程又调用了对应的条件变量的await()方法,则该线程会释放获取到的锁,并被转换为Node节点插入到条件变量对应的条件队列里面。
这时候因为调用lock.lock()方法被阻塞到AQS队列里面的一个线程会获取到被释放的锁,如果该线程也调用了条件变量的await()方法则该线程也会被放入条件变量的条件队列里面。
当另外一个线程条用条件变量的signal()或者signalall方法时,会把条件队列里面的一个或者全部Node节点移动到AQS的阻塞队列里面,等待时机获取锁。
一个锁对应一个阻塞队列,对应多个条件变量,每个条件变量有自己的一个条件队列。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。