一、为什么出现了锁
1.出现线程的原因
首先,随着cpu集成的晶体管增加达到瓶颈,且集成晶体管越多,造价越贵,因此软件开发从单核多任务的并发,转为将任务分解到多个廉价的cpu上并发执行,因此就出现了线程。
线程出现,允许程序中不同的部分都能独立的执行不同的动作
- 可以解决IO并发(比如一个线程从磁盘上读数据,当它在等待的时候,你可以用另一个线程,来做计算或者从某个磁盘的地方读取数据或者向网络发送一条消息并等待回复。
- 可以多核并行,让多核计算机能够发挥作用。
- 执行周期性任务,但不愿意阻塞主线程,比如有一个 master 服务需要周期性的检查它的 worker 服务是否一直存活,如果宕机就要把工作扔到另一台机器上去执行。
ps:除了线程能实现并发,事件驱动模型编程也可以(常用于处理异步事件,就是事件发生和不受程序执行流程的影响)
2.出现锁的原因(互斥与同步)
- 资源竞争
线程是共享地址空间,共享内存的,所以就会出现资源竞争的问题,比如两个线程同时想要修改同一个数据时,应该怎么办,你肯定是想要一次只允许一个线程来修改的,这时就出现了锁这个概念,锁就是保证共享资源同时只能被一个线程访问的机制,这个访问共享资源的代码叫临界区(锁只锁住了临界区,怎么锁的不需要程序员去考虑)
- 线程协作
有一些情况故意的想让不同的线程之间互相受到制约,比如你生产数据,我想在你生产完数据前一直等待,直到你完成后,我再去读取
3.锁机制实现方式
- 信号量
- 用锁解决互斥访问时,共享变量初始化为1用锁解决同步访问时,进程同步则初始化为0, 初始信号量为 0,这样所有的线程调用 P 操作时都无法获取到锁,只能进行等待队列(相当于管程中的等待队列),当其余线程 B 调用 V 操作时会唤醒等待线程。
信号量机制的引入解决了进程同步和互斥问题,但信号量的大量同步操作分散在各个进程中不便于管理,还有可能导致系统死锁。如:生产者消费者问题中将P、V颠倒可能死锁,另外条件越多,需要的信号量就越多,需要更加谨慎地处理信号量之间的处理顺序,否则很容易造成死锁现象。所以出现了管程
-
管程
- 管程为了解决了信号量在临界区的 PV 操作上配对的麻烦,封装了PV操作并加入了条件变量。是一种在信号量机制上进行改进的并发编程模型
- 管程由四部分组成:
- 管程内部的共享变量。
- 管程内部的条件变量。
- 管程内部并行执行的进程。
- 对于局部与管程内部的共享数据设置初始值的语句。
- 比如就医的流程,病人首先第一步需要挂号,取到号后在侯诊室等待叫号,叫到自己时,就从入口进入就诊室就诊了,就诊时,有两种情况,第一种是医生很快就确定病人的病,并作出诊断,诊断完成后,就通知下一位病人进来就诊,另一种是医生无法确定病因,需要病人去做个验血 / CT 检查才能确定病情,于是病人就先去验个血 / CT,病人验完血 / 做完 CT 后,重新取号,等待叫号,病人等到自己的号,病人又重新拿着验血 / CT 报告去找医生就诊
- 整个流程如下
-
那么管程是如何解决互斥和同步的呢
第一种情况是病人进入就诊室后,无需做验血 / CT 等操作,于是医生诊断完成后,就会释放共享资源(解锁)去通知入口等待队列的下一个病人,下一个病人听到叫号后就能看医生了。这是管程实现互斥的情况
还有一种情况是同步的情况,如果病人进入就诊室后需要做验血 / CT 等操作,会去验血 / CT 队列(条件队列)排队, 这时释放了共享变量即医生,所以需要通知入口等待队列的其他病人(线程)去获取共享变量(医生),获得许可的线程执行完临界区的逻辑后会唤醒条件变量等待队列中的线程,将它放到入口等待队列中 ,等到其获取共享变量(医生)时,即可进入入口(临界区)处理。这是管程处理同步的情况
二、AQS(AbstractQueuedSynchronizer)
Java中实现锁的基础,AQS是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。
原理概述:基于管程原理进行实现。它维护了一个共享资源 state 和一个由双向链表实现的等待队列(即上文中管程的入口等待队列),底层利用了 CAS 机制来保证操作的原子性。
1. AQS结构
state 由于是多线程共享变量,所以必须定义成 volatile,以保证 state 的可见性和有序性,,但不能保证原子性,所以 AQS 提供了对 state 的原子操作方法,保证了线程安全。
-
getState():获取当前同步状态。
-
setState(int newState):设置当前同步状态。
-
compareAndSetState(int expect, int update):使用 CAS 设置当前状态,该方法能够保证状态设置的原子性。
-
state 为什么要提供 setState 和 compareAndSetState 两种修改状态的方法?
这个问题,取决于修改时是否存在锁竞争,如果有则必须使用 compareAndSetState。
另外 AQS 中实现的 FIFO 队列(CLH 队列)其实是双向链表实现的,由 head, tail 节点表示,head 结点代表当前占用的线程,其他节点由于暂时获取不到锁所以依次排队等待锁释放。
1.1等待队列
- 头节点指向当前持有锁的线程 :volatile Node head
- 尾节点每个新的节点进来,都插入到最后 :volatile Node tail
- 共享变量state,在独占锁重入锁时,state表示锁持有的次数 :volatile int state
- 当前持有锁的线程 :Thread exclusiveOwnerThread
等待队列示意如下所示,注意,之后分析过程中所说的 queue,也就是阻塞队列不包含 head
1.2Node节点属性
等待队列中每个线程被包装成一个 Node 实例,数据结构是链表
总结来说,节点就是thread + waitStatus +( pre + next)/nextWaiter
volatile int waitStatus; // 结点状态
volatile Node prev; // 同步队列:互斥等待队列 Lock
volatile Node next; // 同步队列
volatile Thread thread; // 阻塞的线程
Node nextWaiter; // 等待队列:条件等待 Condition
2.获取锁
先按独占模式的可重入的公平锁(reentrantlock)讲解源码
-
//公平锁实现lock()接口 final void lock() { acquire(1); } //首先 调用 tryAcquire 尝试着获取 state,如果成功,则跳过后面的步骤。 //如果失败,则执行addWaiter将线程封装成节点作为 acquireQueued入参, //acquireQueued方法将线程加入 阻塞队列中。 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
-
tryAcquire:
具体获取锁的策略,由AQS的子类实现。这个方法只是尝试获取锁一下,获取失败再继续下面的处理,成功了就直接返回了,进程就直接获取锁了,这里使用的是乐观锁的思想,先假设没有并发冲突,有冲突了再去处理。
// 尝试直接获取锁,返回值是boolean,代表是否获取到锁
// tryAcquire的公平版
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// state == 0 此时此刻没有线程持有锁
if (c == 0) {
// 公平锁,既然是公平,就得讲究先来后到,
// 判断是否有线程在阻塞,因为是公平锁,所以需要按照申请锁的顺序来获取锁
if (!hasQueuedPredecessors() &&
// 没有线程在阻塞说明可以获取锁,
// 不成功的话,只能说明一个问题,就在刚刚几乎同一时刻有个线程抢先了
//因此会返回失败
compareAndSetState(0, acquires)) {
// 到这里就是获取到锁了,标记一下,告诉大家,现在是我占用了锁
setExclusiveOwnerThread(current);
return true;
}
}
// state不等于0,可能重入了
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)//int溢出
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
- addWaiter:通过 enq 方法添加到阻塞队列中,然后放入队列尾部。因为并发时头节点和尾节点可能在实时变化并且是线程共享的,所以需要CAS操作来设置尾节点和尾节点,CAS失败时就需要自旋操作,所以enq方法就是在队空或者CAS失败时自旋将线程加入,并返回此节点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 以下几行代码想把当前node加到链表的最后面去,也就是进到阻塞队列的最后
Node pred = tail;
// tail!=null => 队列不为空
if (pred != null) {
// 将当前的队尾节点,设置为自己的前驱
node.prev = pred;
// 用CAS把自己设置为队尾, 如果成功后,tail == node 了,
// 这个节点成为阻塞队列新的尾巴
if (compareAndSetTail(pred, node)) {
// 进到这里说明设置成功,当前node==tail, 将自己与之前的队尾相连,
// 上面已经有 node.prev = pred,加上下面这句,
// 也就实现了和之前的尾节点双向连接了
pred.next = node;
// 线程入队了,可以返回了
return node;
}
}
// 如果会到这里,
// 说明 pred==null(队列是空的) 或者 CAS失败(有线程在竞争入队)
enq(node);
return node;
}
// 采用自旋的方式入队
// 之前说过,到这个方法只有两种可能:等待队列为空,或者有线程竞争入队,
// 自旋在这边的语义是:CAS设置tail过程中,竞争一次竞争不到,我就多次竞争,总会排到的
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 之前说过,队列为空也会进来这里
if (t == null) { // Must initialize
// 初始化head节点
// 细心的读者会知道原来 head 和 tail 初始化的时候都是 null 的
// 还是一步CAS,,现在可能是很多线程同时进来
if (compareAndSetHead(new Node()))
// 给后面用:这个时候head节点的waitStatus==0
// 这个时候有了head,但是tail还是null,设置一下,
// 把tail指向head,放心,马上就有线程要来了,到时候tail就要被抢了
// 注意:这里只是设置了tail=head,这里没return
// 所以,设置完了以后,继续for循环,下次就到下面的else分支了
tail = head;
} else {
// 下面几行,和上一个方法 addWaiter 是一样的
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
-
acquireQueued:线程进入同步队列后,会将该线程挂起,直到有甚至线程唤醒该线程。
-
假设此时有三个线程t1,t2,t3,t1获取锁成功,t2,t3入队了
现在问题来了, T2,T3 入队后怎么处理,马上阻塞吗,马上阻塞意味着线程从运行态转为阻塞态 ,涉及到用户态向内核态的切换,而且唤醒后也要从内核态转为用户态,开销相对比较大,所以 AQS 对这种入队的线程采用的方式是让它们自旋来竞争锁,如下图示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8QOs2ZbG-1661247947899)(https://confluence.xsops.cn/download/attachments/81901675/%EF%BC%98%20%281%29.png?version=1&modificationDate=1660531713000&api=v2)]
这样会产生一个问题,如果当前锁是独占锁,如果锁一直被 T1 占有, T2,T3 一直自旋没太大意义,反而会占用 cpu,影响性能,所以更合适的方式是它们自旋一两次竞争不到锁后识趣地阻塞以等待前置节点释放锁后再来唤醒它。AQS设置的是如果当前节点的上一个节点不为 head,且它的状态为 SIGNAL,则结点进入阻塞状态。只有当获取到锁或者出现异常,节点才会终止自旋
-
// 下面这个方法,参数node,经过addWaiter(Node.EXCLUSIVE),此时已经进入阻塞队列 // 注意一下:如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg))返回true的话, // 意味着上面这段代码将进入selfInterrupt(),所以正常情况下,下面应该返回false final boolean acquireQueued(final Node node, int arg) { boolean Failed = true; try { boolean interrupted = false; for (;;) { //自旋获取锁 final Node p = node.predecessor(); // p == head 说明当前节点是阻塞队列的第一个节点 // 所以当前节点可以去试抢一下锁 // 这里我们说一下,为什么可以去试试: // 首先,它是队头,这个是第一个条件, // 其次,当前的head有可能是刚刚初始化的虚拟节点,当前持有锁的线程节点为空 if (p == head && tryAcquire(arg)) { //抢占锁成功 setHead(node); //将 head 结点指向当前节点,原 head 结点出队 p.next = null; // help GC Failed = false; return interrupted; } // 如果前一个节点不是 head 或者竞争锁失败,则进入阻塞状态 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 什么时候 Failed 会为 true??? // tryAcquire() 方法抛异常的情况 if (Failed) cancelAcquire(node); } }
-
流程图如下:
-
-
shouldParkAfterFailedAcquire:将前驱结点的状态修改成 SIGNAL,同时会清理已经 CANCELLED 的结点。注意,只有前驱结点的状态为 SIGNAL,当它释放锁时才会唤醒后继结点。返回 true 代表线程可以进入阻塞中断,false会再次自旋获取锁
- 整体步骤如下:
- 首先,如果前驱节点为 SIGNAL,则当前节点可以进入阻塞状态。
- 如果前驱节点为取消状态,则前驱节点需要移除
- 如果前驱节点小于等于 0,则需要首先将其前驱节点置为 SIGNAL,这样下一次自旋后发现前驱节点为 SIGNAL,就会返回 true(即步骤 1)
- 为什么第二次获取 失败才返回true
- 整体步骤如下:
// "当前线程没有抢到锁,是否需要挂起当前线程?"
// 第一个参数是前驱节点,第二个参数才是代表当前线程的节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前驱节点的 waitStatus == -1 ,当前线程需要阻塞,直接可以返回true
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 前驱节点 waitStatus大于0 ,之前说过,大于0 说明前驱节点取消了排队。
// 这里需要知道这点:进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的。
// 所以下面这块代码说的是将当前节点的prev指向waitStatus<=0的节点,
// 简单说,就是为了找个好爹,因为你还得依赖它来唤醒呢,如果前驱节点取消了排队,
// 找前驱节点的前驱节点做爹,往前遍历总能找到一个好爹的
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 仔细想想,如果进入到这个分支意味着什么
// 前驱节点的waitStatus不等于-1和1,那也就是只可能是0,-2,-3
// 在我们前面的源码中,都没有看到有设置waitStatus的,
// 所以每个新的node入队时,waitStatu都是0
// 正常情况下,前驱节点是之前的 tail,那么它的 waitStatus 应该是 0
// 用CAS将前驱节点的waitStatus设置为Node.SIGNAL(也就是-1)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 这个方法返回 false,那么会再走一次 for 循序,
// 然后再次进来此方法,此时会从第一个分支返回 true
return false;
}
// 如果shouldParkAfterFailedAcquire(p, node)返回true,
// 那么需要执行parkAndCheckInterrupt():
// 这个方法很简单,因为前面返回true,所以需要挂起线程,这个方法就是负责挂起线程的
// 这里用了LockSupport.park(this)来挂起线程,然后就停在这里了,等待被唤醒=======
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();// 如果中断状态为true,那么park无法阻塞。
// 所以需要判断中断状态并清空
// 因为清空了,后面需要补上中断selfInterrupt()
}
}
private void cancelAcquire(Node node) {
// 如果节点为空,直接返回
if (node == null)
return;
// 由于线程要被取消了,所以将 thread 线程清掉
node.thread = null;
// 下面这步表示将 node 的 pre 指向之前第一个非取消状态的结点
// (即跳过所有取消状态的结点),waitStatus > 0 表示当前结点状态为取消状态
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 获取经过过滤后的 pre 的 next 结点,
// 这一步主要用在后面的 CAS 设置 pre 的 next 节点上
Node prednext = pred.next;
// 将当前结点设置为取消状态
node.waitStatus = Node.CANCELLED;
// 如果当前取消结点为尾结点,使用 CAS 则将尾结点设置为其前驱节点,
// 如果设置成功,则尾结点的 next 指针设置为空
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, prednext, null);
} else {
// 如果当前节点不是尾节点,是要把当前节点的前驱节点指向当前节点的后继节点
// 但是要唤醒或阻塞结点,须在其前驱节点的状态为 SIGNAL 的条件才能操作,
// 所以在设置 pre 的 next 节点时要保证 pre 结点的状态为 SIGNAL
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, prednext, next);
} else {
// 如果 pre 为 head,或者 pre 的状态设置 SIGNAL 失败,
// 则直接唤醒后继结点去竞争锁,
// 因为SIGNAL 的结点取消(或释放锁)后可以唤醒后继结点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
-
首先设置取消节点状态
node.thread = null
node.waitStatus = Node.CANCELLED;
-
将队内取消状态的节点出队,如果node为尾节点(只用往前找)
1. ④为CAS操作,会往前找不是取消节点的线程作为尾节点
-
如果node不为尾节点也不为头节点的后继节点(前后都需要找不是取消状态的节点) ④和⑥是CAS操作
shouldParkAfterFailedAcquire会移除取消状态的结点,所以这里取消状态的节点会被 GC
4.如果取消节点是头节点的后继节点,则直接唤醒其后继节点 ④之后是acquireAQueue(s)的操作
为什么是前驱
思考1:shouldParkAfterFailedAcquire等方法 为什么选择 node.prev 前驱结点的原子性,而 node.next 后继结点则是辅助结点?
- next 域:需要修改二处来保证原子性,一是 tail.next;二是 tail 指针。
- prev 域:只需要修改一处来保证原子性,就是 tail 指针。你可能会说不需要修改 node.prev 吗?当然需要,但 node 还没添加到链表中,其 node.prev 修改并没有锁竞争的问题,将 tail 指针指向 node 时,如果失败会通过自旋不断尝试。
前驱和后驱原子性操作对比
说明: 通过上图,前驱结点只需要一次原子性操作就可以,而后继结点则需要二次原子性操作,复杂性就会大提升,这就是 AQS 选择前驱结点进行原子性操作的原因。
思考2:取消节点操作的都是next指针,什么情况下会对Prev指针进行操作?
执行cancelAcquire的时候,当前节点的前置节点可能已经从队列中出去了(已经执行过Try代码块中的shouldParkAfterFailedAcquire方法了),如果此时修改Prev指针,有可能会导致Prev指向另一个已经移除队列的Node,因此这块变化Prev指针不安全。 shouldParkAfterFailedAcquire方法中,会执行下面的代码,其实就是在处理Prev指针。shouldParkAfterFailedAcquire是获取锁失败的情况下才会执行,进入该方法后,说明共享资源已被获取,当前节点之前的节点都不会出现变化,因此这个时候变更Prev指针比较安全。
3.释放锁
-
release:lock.unlock() 方法用于释放锁。释放锁相比获取锁要简单一些,因为此时线程已经获取到锁,可以不使用 CAS 原子性操作和自旋操作。
这里释放锁的条件为啥是 h != null && h.waitStatus != 0 呢。
- 如果 h == null, 这有两种可能,一种是阻塞队列为空,当然没有所谓的唤醒后继节点,一种是其他线程正在竞争锁,只是还未初始化头节点,既然其他线程正在运行,也就无需执行唤醒操作
- 如果 h != null 且 h.waitStatus == 0,说明 head 的后继节点正在自旋竞争锁,也就是说线程没有被阻塞,无需唤醒。
- 如果 h != null 且 h.waitStatus < 0, 此时 waitStatus 值可能为 SIGNAL,或 PROPAGATE,这两种情况说明后继结点阻塞需要被唤醒
-
tryRelease:具体释放锁的策略,由子类实现。
protected final boolean tryRelease(int releases) { int c = getState() - releases; // 只有持有锁的线程才能释放锁,所以如果当前线程不是持有锁的线程,则抛异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 说明线程持有的锁全部释放了,需要释放 exclusiveOwnerThread 的持有线程 if (c == 0) { free = true; setExclusiveOwnerThread(null); }//如果锁没有全部释放,只更新state(锁的数量) setState(c); return free;
}
- unparkSuccessor:唤醒后继结点。这里需要注意unparkSuccessor方法的参数node是想要唤醒的节点的前继节点,因为后继节点将要唤醒,所以先置为0,避免重复唤醒,一般是要唤醒后继节点的,但是如果后继节点为空或者取消了就只能从尾部往前找最接近的。最后unpark唤醒它
private void unparkSuccessor(Node node) {
// 1. node表示需要唤醒的结点的前继节点,将其状态重新设置为INITIAL。
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// node.prev是线程安全的,而node.next则不是线程安全的
// 为啥要从后往前找呢,因为节点入队并不是原子操作
Node s = node.next;
if (s == null || s.waitStatus > 0) { // 一般为后继节点,但是后继节点为空或者取消了
s = null; // 就只能从后往前找最接近的
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 3. 唤醒同步线程,当然这个线程不一定能抢占到锁。比如非公平锁
if (s != null)
LockSupport.unpark(s.thread);
}
4.Condition条件同步
Condition 等待队列(waitQueue)要比 Lock 同步队列(syncQueue)简单很多,最重要的原因是 waitQueue 的操作都是在获取锁的线程中执行,不存在数据竞争的问题。同时因为没有唤醒等复杂过程,所以条件队列是一个单向链表
每个 锁实例可以通过调用多次 newCondition 产生多个 ConditionObject 的实例
- 每个 condition 有一个关联的条件队列,如线程 1 调用
condition1.await()
方法即可将当前线程 1 包装成 Node 后加入到条件队列中,然后阻塞在这里,不继续往下执行; - 调用
condition1.signal()
触发一次唤醒,此时唤醒的是队头,会将condition1 对应的条件队列的 firstWaiter(队头) 移到阻塞队列的队尾,等待获取锁,获取锁后 await 方法才能返回,继续往下执行。
- 头指针:Node firstWaiter
- 尾指针:Node lastWaiter
ConditionObject 重要的方法说明:
-
await:阻塞线程并放弃锁,加入到等待队列中。
// 首先,这个方法是可被中断的,不可被中断的是另一个方法 awaitUninterruptibly() // 这个方法会阻塞,直到调用 signal 方法(指 signal() 和 signalAll(),下同),或被中断 public final void await() throws InterruptedException { // 既然该方法要响应中断,那么在最开始就判断中断状态 if (Thread.interrupted()) throw new InterruptedException(); // 添加到 condition 的条件队列中 Node node = addConditionWaiter(); // 释放锁,返回值是释放锁之前的 state 值 // await() 之前,当前线程是必须持有锁的,这里肯定要释放掉 int savedState = fullyRelease(node); int interruptMode = 0; // 这里退出循环有两种情况 // 1. isOnSyncQueue(node) 返回 true,即当前 node 已经转移到阻塞队列了 // 2. checkInterruptWhileWaiting检查中断,如果在signal()之前被中断返回THROW_IE, // 如果signal()之后被中断则返回REINTERRUPT,如果没有被中断则返回0。 while (!isOnSyncQueue(node)) { LockSupport.park(this); //阻塞,直到signal()或中断。 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 被唤醒后,将进入阻塞队列,等待获取锁, // acquireQueued会返回是否中断过,是则要更新interruptMode if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0)//如果在阻塞时被中断,则抛出InterruptedException。 reportInterruptAfterWait(interruptMode); }
-
addConditionWaiter:将结点(状态为 CONDITION)添加到等待队列 waitQueue 中,不存在锁竞争。
// 将当前线程对应的节点入队,插入队尾 private Node addConditionWaiter() { Node t = lastWaiter; // 如果条件队列的最后一个节点取消了,将其清除出去 // 为什么这里把 waitStatus 不等于 Node.CONDITION,就判定为该节点发生了取消排队? if (t != null && t.waitStatus != Node.CONDITION) { // 这个方法会遍历整个条件队列,然后会将已取消的所有节点清除出队列 unlinkCancelledWaiters(); t = lastWaiter; } // node 在初始化的时候,指定 waitStatus 为 Node.CONDITION Node node = new Node(Thread.currentThread(), Node.CONDITION); // t 此时是 lastWaiter,队尾 // 如果队列为空 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
-
fullyRelease:释放锁,并唤醒后继等待线程。
// 首先,我们要先观察到返回值 savedState 代表 release 之前的 state 值 // 对于最简单的操作:先 lock.lock(),然后 condition1.await()。 // 那么 state 经过这个方法由 1 变为 0,锁释放,此方法返回 1 // 相应的,如果 lock 重入了 n 次,savedState == n // 如果这个方法失败,会将节点设置为"取消"状态,并抛出异常 IllegalMonitorStateException final int fullyRelease(Node node) { boolean Failed = true; try { int savedState = getState(); // 这里使用了当前的 state 作为 release 的参数,也就是完全释放掉锁, // 将 state 置为 0 if (release(savedState)) { Failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (Failed) node.waitStatus = Node.CANCELLED; } }
-
isOnSyncQueue:根据结点是否在同步队列上,判断等待线程是否已经被唤醒。
// 在节点入条件队列的时候,初始化时设置了 waitStatus = Node.CONDITION // signal 的时候需要将节点从条件队列移到阻塞队列, // 这个方法就是判断 node 是否已经移动到阻塞队列了 final boolean isOnSyncQueue(Node node) { // 移动过去的时候,node 的 waitStatus 会置为 0,这个之后在说 signal 方法的时候会说到 // 如果 waitStatus 还是 Node.CONDITION,也就是 -2,那肯定就是还在条件队列中 // 如果 node 的前驱 prev 指向还是 null,肯定没有在阻塞队列(prev是阻塞队列链表中使用的) if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 如果 node 已经有后继节点 next 的时候,那肯定是在阻塞队列了 if (node.next != null) return true; // 下面这个方法从阻塞队列的队尾开始从后往前遍历找,如果找到相等的, // 说明在阻塞队列,否则就是不在阻塞队列 // 可以通过判断 node.prev() != null 来推断出 node 在阻塞队列吗?答案是:不能。 // 这个可以看上篇 AQS 的入队方法,首先设置的是 node.prev 指向 tail, // 然后是 CAS 操作将自己设置为新的 tail,可是这次的 CAS 是可能失败的。 return findNodeFromTail(node); } // 从阻塞队列的队尾往前遍历,如果找到,返回 true private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
-
unlinkCancelledWaiters:清理取消等待的线程。
// 等待队列是一个单向链表,遍历链表将已经取消等待的节点清除出去 // 纯属链表操作,很好理解,看不懂多看几遍就可以了 private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; // 如果节点的状态不是 Node.CONDITION 的话,这个节点就是被取消的 if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
-
signal:唤醒等待线程,没有特殊的要求,尽量使用 signalAll。
// 将等待时间最长的线程(如果存在)从该条件的等待队列移动到所属锁的等待队列。 public final void signal() { // 调用 signal 方法的线程必须持有当前的独占锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } // 从条件队列队头往后遍历,找出第一个需要转移的 node // 因为有些线程会取消排队,但是可能还在队列中 private void doSignal(Node first) { do { // 将 firstWaiter 指向 first 节点后面的第一个,因为原来的first 节点马上要离开了 // 如果将 first 移除后,后面没有节点在等待了,那么需要将 lastWaiter 置为 null if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 因为 first 马上要被移到阻塞队列了,和条件队列的链接关系在这里断掉 first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); // 这里 while 循环,如果 first 转移不成功,那么选择 first 后面的第一个节点进行转移 } // 将节点从条件队列转移到阻塞队列 // true 代表成功转移 // false 代表在 signal 之前,节点已经取消了 final boolean transferForSignal (Node node) { // CAS 如果失败,说明此 node 的 waitStatus 已不是 Node.CONDITION,说明节点已经取消, // 既然已经取消,也就不需要转移了,方法返回,转移后面一个节点 // 否则,将 waitStatus 置为 0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // enq(node): 自旋进入阻塞队列的队尾 // 注意,这里的返回值 p 是 node 在阻塞队列的前驱节点 Node p = enq(node); int ws = p.waitStatus; // ws > 0 说明 node 在阻塞队列中的前驱节点取消了等待锁,直接唤醒 node 对应的线程。 // 如果 ws <= 0, 那么 compareAndSetWaitStatus 将会被调用, // 节点入队后,需要把前驱节点的状态设为 Node.SIGNAL(-1) // 这样这个新加的节点才能被前驱节点唤醒, // 如果这时前驱节点取消,前驱节点是在获取锁的过程中取消了, // 所以其后续节点可以去提前唤醒线程,这里是出于性能考虑吧,不唤醒也不会出错 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 如果前驱节点取消或者 CAS 失败,会进到这里唤醒线程 LockSupport.unpark(node.thread); return true; }
CAS 失败原因为ws>0判断后节点取消了,这里唤醒是出于性能的原因,不唤醒也不会出错,唤醒能
解释下 interruptMode。interruptMode 可以取值为 REINTERRUPT(1),THROW_IE(-1),0
- REINTERRUPT: 代表 await 返回的时候,需要重新设置中断状态
- THROW_IE: 代表 await 返回的时候,需要抛出 InterruptedException 异常
- 0 :说明在 await 期间,没有发生中断
有以下三种情况会让 LockSupport.park(this); 这句返回继续往下执行:
- 常规路径。signal -> 转移节点到阻塞队列 -> 获取了锁(unpark)
- 线程中断。在 park 的时候,另外一个线程对这个线程进行了中断
- signal 的时候我们说过,转移以后的前驱节点取消了,在
- 假唤醒。这个也是存在的,和 Object.wait() 类似,都有这个问题
线程唤醒后第一步是调用 checkInterruptWhileWaiting(node) 这个方法,此方法用于判断是否在线程挂起期间发生了中断,如果发生了中断,是 signal 调用之前中断的,还是 signal 之后发生的中断。
// 1. 如果在 signal 之前已经中断,返回 THROW_IE
// 2. 如果是 signal 之后中断,返回 REINTERRUPT
// 3. 没有发生中断,返回 0
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
Thread.interrupted():如果当前线程已经处于中断状态,那么该方法返回 true,同时将中断状态重置为 false,所以,才有后续的 重新中断(REINTERRUPT) 的使用。
看看怎么判断是 signal 之前还是之后发生的中断:
// 只有线程处于中断状态,才会调用此方法
// 如果需要的话,将这个已经取消等待的节点转移到阻塞队列
// 返回 true:如果此线程在 signal 之前被取消,
final boolean transferAfterCancelledWait(Node node) {
// 用 CAS 将节点状态设置为 0
// 如果这步 CAS 成功,说明是 signal 方法之前发生的中断,因为如果 signal 先发生的话,signal 中会将 waitStatus 设置为 0
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 将节点放入阻塞队列
// 这里我们看到,即使中断了,依然会转移到阻塞队列
enq(node);
return true;
}
// 到这里是因为 CAS 失败,肯定是因为 signal 方法已经将 waitStatus 设置为了 0
// signal 方法会将节点转移到阻塞队列,但是可能还没完成,线程让步直到转移完成
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
即使发生了中断,节点依然会转移到阻塞队列。
while 循环怎么退出。要么中断,要么转移成功。
这里描绘了一个场景,本来有个线程,它是排在条件队列的后面的,但是因为它被中断了,那么它会被唤醒,然后它发现自己不是被 signal 的那个,但是它会自己主动去进入到阻塞队列。
while 循环出来以后,下面是这段代码:
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
由于 while 出来后,我们确定节点已经进入了阻塞队列,准备获取锁。
这里的 acquireQueued(node, savedState) 的第一个参数 node 之前已经经过 enq(node) 进入了队列,参数 savedState 是之前释放锁前的 state,这个方法返回的时候,代表当前线程获取了锁,而且 state == savedState了。
注意,前面我们说过,不管有没有发生中断,都会进入到阻塞队列,而 acquireQueued(node, savedState) 的返回值就是代表线程是否被中断。如果返回 true,说明被中断了,而且 interruptMode != THROW_IE,说明在 signal 之前就发生中断了,这里将 interruptMode 设置为 REINTERRUPT,用于待会重新中断。
继续往下:
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
本着一丝不苟的精神,这边说说 node.nextWaiter != null
怎么满足。我前面也说了 signal 的时候会将节点转移到阻塞队列,有一步是 node.nextWaiter = null,将断开节点和条件队列的联系。
可是,在判断发生中断的情况下,是 signal 之前还是之后发生的?
这部分的时候,我也介绍了,如果 signal 之前就中断了,也需要将节点进行转移到阻塞队列,这部分转移的时候,是没有设置 node.nextWaiter = null 的。
之前我们说过,如果有节点取消,也会调用 unlinkCancelledWaiters 这个方法,就是这里了。
处理中断状态
到这里,我们终于可以好好说下这个 interruptMode 干嘛用了。
- 0:什么都不做,没有被中断过;
- THROW_IE:await 方法抛出 InterruptedException 异常,因为它代表在 await() 期间发生了中断;
- REINTERRUPT:重新中断当前线程,因为它代表 await() 期间没有被中断,而是 signal() 以后发生的中断
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
5.为什么需要 AQS
Java 已经在语言层次提供 synchronized 锁,为什么要在 SDK 层次提供 AQS 锁?
synchronized 申请资源的时候,如果申请不到,线程直接进入阻塞状态,也释放不了线程已经占有的资源,容易导致死锁
- 能够响应中断。synchronized 一旦进入阻塞状态,就无法被中断。但如果阻塞状态的线程能够响应中断信号,能够被唤醒。这样就破坏了不可抢占条件了。
- 支持超时。如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
- 非阻塞地获取锁。如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
public class MyLock {
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire (int arg) {
return compareAndSetState(0, 1);
}
@Override
protected boolean tryRelease (int arg) {
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively () {
return getState() == 1;
}
}
private Sync sync = new Sync();
public void lock () {
sync.acquire(1);
}
public void unlock () {
sync.release(1);
}
}
public class Main {
static int count = 0;
static MyLock leeLock = new MyLock ();
public static void main (String[] args) throws InterruptedException {
Runnable runnable = new Runnable() {
@Override
public void run () {
try {
MyLock.lock();
for (int i = 0; i < 100; i++) {
count++;
}
} catch (Exception e) {
e.printstacktrace();
} finally {
MyLock.unlock();
}
}
};
Thread thread1 = new Thread(runnable);
Thread thread2 = new Thread(runnable);
thread1.start();
thread2.start();
System.out.println(count);
}
}
6.简单应用
AQS底层使用了模板方法模式
同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放)
将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
这和我们以往通过实现接口的方式有很大区别
AQS使用了模板方法模式,自定义同步器时需要重写下面几个AQS提供的模板方法:需要用一个子类继承它,然后选择实现它指定的一些方法,其他的事情这个父类都会帮我做好的
isHeldExclusively()//该线程是否正在独占资源。只有用到condition或者重入锁才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
默认情况下,每个方法都抛出 UnsupportedOperationException。 AQS类中的其他方法都是final ,所以无法被其他类重写,只有这几个方法可以被其他类重写。
通过简单的几行代码就能实现同步功能,这就是AQS的强大之处。
参考资料:
https://javadoop.com/post/AbstractQueuedSynchronizer
https://www.cnblogs.com/binarylei/p/12555166.html
原文地址:https://www.jb51.cc/wenti/3285149.html
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。