正文
1. J.U.C的lock包结构
上一篇文章讲了并发编程的锁机制:synchronized和lock,主要介绍了Java并发编程中常用的锁机制。Lock是一个接口,而synchronized是Java中的关键字,synchronized是基于jvm实现。Lock锁可以被中断,支持定时锁等。Lock的实现类,可重入锁ReentrantLock,我们有讲到其具体用法。而谈到ReentrantLock,不得不谈抽象类AbstractQueuedSynchronizer(AQS)。抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock、ThreadPoolExecutor。
2. AQS介绍
AQS是一个抽象类,主是是以继承的方式使用。AQS本身是没有实现任何同步接口的,它仅仅只是定义了同步状态的获取和释放的方法来供自定义的同步组件的使用。AQS抽象类包含如下几个方法:
AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。共享模式时只用 Sync Queue, 独占模式有时只用 Sync Queue, 但若涉及 Condition, 则还有 Condition Queue。在子类的 tryAcquire, tryAcquireShared 中实现公平与非公平的区分。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。
整个 AQS 分为以下几部分:
- Node 节点, 用于存放获取线程的节点, 存在于 Sync Queue, Condition Queue, 这些节点主要的区分在于 waitStatus 的值(下面会详细叙述)
- Condition Queue, 这个队列是用于独占模式中, 只有用到 Condition.awaitXX 时才会将 node加到 tail 上(PS: 在使用 Condition的前提是已经获取 Lock)
- Sync Queue, 独占 共享的模式中均会使用到的存放 Node 的 CLH queue(主要特点是, 队列中总有一个 dummy 节点, 后继节点获取锁的条件由前继节点决定, 前继节点在释放 lock 时会唤醒sleep中的后继节点)
- ConditionObject, 用于独占的模式, 主要是线程释放lock, 加入 Condition Queue, 并进行相应的 signal 操作。
- 独占的获取lock (acquire, release), 例如 ReentrantLock。
- 共享的获取lock (acquireShared, releaseShared), 例如 ReeantrantReadWriteLock, Semaphore, CountDownLatch
下面我们具体来分析一下AQS实现的源码。
3. 内部类 Node
Node 节点是代表获取lock的线程, 存在于 Condition Queue, Sync Queue 里面, 而其主要就是 nextWaiter (标记共享还是独占),waitStatus 标记node的状态。
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException{
Node p = prev;
if(p == null){
throw new NullPointerException();
}else{
return p;
}
}
Node(){
}
Node(Thread thread, Node mode){
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus){
this.waitStatus = waitStatus;
this.thread = thread;
}
}
waitStatus的状态变化:
- 线程刚入 Sync Queue 里面, 发现独占锁被其他人获取, 则将其前继节点标记为 SIGNAL, 然后再尝试获取一下锁(调用 tryAcquire 方法)
- 若调用 tryAcquire 方法获取失败, 则判断一下是否前继节点被标记为 SIGNAL, 若是的话 直接 block(block前会确保前继节点被标记为SIGNAL, 因为前继节点在进行释放锁时根据是否标记为 SIGNAL 来决定唤醒后继节点与否 <- 这是独占的情况下)
- 前继节点使用完lock, 进行释放, 因为自己被标记为 SIGNAL, 所以唤醒其后继节点
waitStatus 变化过程:
- 独占模式下: 0(初始) -> signal(被后继节点标记为release需要唤醒后继节点) -> 0 (等释放好lock, 会恢复到0)
- 独占模式 + 使用 Condition情况下: 0(初始) -> signal(被后继节点标记为release需要唤醒后继节点) -> 0 (等释放好lock, 会恢复到0)其上可能涉及 中断与超时, 只是多了一个 CANCELLED, 当节点变成 CANCELLED, 后就等着被清除。
- 共享模式下: 0(初始) -> PROPAGATE(获取 lock 或release lock 时) (获取 lock 时会调用 setHeadAndPropagate 来进行 传递式的唤醒后继节点, 直到碰到 独占模式的节点)
- 共享模式 + 独占模式下: 0(初始) -> signal(被后继节点标记为release需要唤醒后继节点) -> 0 (等释放好lock, 会恢复到0)
其上的这些状态变化主要在: doReleaseShared , shouldParkAfterFailedAcquire 里面。
4. Condition Queue
Condition Queue 是一个并发不安全的, 只用于独占模式的队列(PS: 为什么是并发不安全的呢? 主要是在操作 Condition 时, 线程必需获取 独占的 lock, 所以不需要考虑并发的安全问题);
而当Node存在于 Condition Queue 里面, 则其只有 waitStatus, thread, nextWaiter 有值, 其他的都是null(其中的 waitStatus 只能是 CONDITION, 0(0 代表node进行转移到 Sync Queue里面, 或被中断/timeout)); 这里有个注意点, 就是当线程被中断或获取 lock 超时, 则一瞬间 node 会存在于 Condition Queue, Sync Queue 两个队列中.
节点 Node4, Node5, Node6, Node7 都是调用 Condition.awaitXX 方法加入 Condition Queue(PS: 加入后会将原来的 lock 释放)。
4.1 入队列方法 addConditionWaiter
将当前线程封装成一个 Node 节点放入到 Condition Queue 里面大家可以注意到, 下面对 Condition Queue 的操作都没考虑到 并发(Sync Queue 的队列是支持并发操作的), 这是为什么呢? 因为在进行操作 Condition 是当前的线程已经获取了AQS的独占锁, 所以不需要考虑并发的情况。
private Node addConditionWaiter(){
Node t = lastWaiter;
if(t != null && t.waitStatus != Node.CONDITION){
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if(t == null){
firstWaiter = node;
} else {
t.nextWaiter = node;
}
lastWaiter = node;
return node;
}
4.2 删除Cancelled节点的方法 unlinkCancelledWaiters
当Node在Condition Queue 中, 若状态不是 CONDITION, 则一定是被中断或超时。在调用 addConditionWaiter 将线程放入 Condition Queue 里面时或 awiat 方法获取结束时 进行清理 Condition queue 里面的因 timeout/interrupt 而还存在的节点。这个删除操作比较巧妙, 其中引入了 trail 节点, 可以理解为traverse整个 Condition Queue 时遇到的最后一个有效的节点。
private void unlinkCancelledWaiters(){
Node t = firstWaiter;
Node trail = null;
while(t != null){
Node next = t.nextWaiter;
if(t.waitStatus != Node.CONDITION){
t.nextWaiter = null;
if(trail == null){
firstWaiter = next;
} else {
trail.nextWaiter = next;
}
if(next == null){
lastWaiter = trail;
}
}else{
trail = t;
}
t = next;
}
}
4.3 转移节点的方法 transferForSignal
transferForSignal只有在节点被正常唤醒才调用的正常转移的方法。
将Node 从Condition Queue 转移到 Sync Queue 里面在调用transferForSignal之前, 会 first.nextWaiter = null;而我们发现若节点是因为 timeout / interrupt 进行转移, 则不会进行这步操作; 两种情况的转移都会把 wautStatus 置为 0
final boolean transferForSignal(Node node){
if(!compareAndSetWaitStatus(node, Node.CONDITION, 0)){
return false;
}
Node p = enq(node);
int ws = p.waitStatus;
if(ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)){
LockSupport.unpark(node.thread);
}
return true;
}
4.4 转移节点的方法 transferAfterCancelledWait
transferAfterCancelledWait 在节点获取lock时被中断或获取超时才调用的转移方法。将 Condition Queue 中因 timeout/interrupt 而唤醒的节点进行转移
final boolean transferAfterCancelledWait(Node node){
if(compareAndSetWaitStatus(node, Node.CONDITION, 0)){
enq(node);
return true;
}
while(!isOnSyncQueue(node)){
Thread.yield();
}
return false;
}
5. Sync Queue
AQS内部维护着一个FIFO的CLH队列,所以AQS并不支持基于优先级的同步策略。至于为何要选择CLH队列,主要在于CLH锁相对于MSC锁,他更加容易处理cancel和timeout,同时他具备进出队列快、无所、畅通无阻、检查是否有线程在等待也非常容易(head != tail,头尾指针不同)。当然相对于原始的CLH队列锁,ASQ采用的是一种变种的CLH队列锁:
- 原始CLH使用的locked自旋,而AQS的CLH则是在每个node里面使用一个状态字段来控制阻塞,而不是自旋。
- 为了可以处理timeout和cancel操作,每个node维护一个指向前驱的指针。如果一个node的前驱被cancel,这个node可以前向移动使用前驱的状态字段。
- head结点使用的是傀儡结点。
这个图代表有个线程获取lock, 而 Node1, Node2, Node3 则在Sync Queue 里面进行等待获取lock(PS: 注意到 dummy Node 的SINGNAL 这是叫获取 lock 的线程在释放lock时通知后继节点的标示)
5.1 Sync Queue 节点入Queue方法
这里有个地方需要注意, 就是初始化 head, tail 的节点, 不一定是 head.next, 因为期间可能被其他的线程进行抢占了。将当前的线程封装成 Node 加入到 Sync Queue 里面。
private Node addWaiter(Node mode){
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if(pred != null){
node.prev = pred;
if(compareAndSetTail(pred, node)){
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node){
for(;;){
Node t = tail;
if(t == null){
if(compareAndSetHead(new Node())){
tail = head;
}
}else{
node.prev = t;
if(compareAndSetTail(t, node)){
t.next = node;
return t;
}
}
}
}
5.2 Sync Queue 节点出Queue方法
这里的出Queue的方法其实有两个:
新节点获取lock, 调用setHead抢占head, 并且剔除原head;节点因被中断或获取超时而进行 cancelled, 最后被剔除。
private void setHead(Node node){
head = node;
node.thread = null;
node.prev = null;
}
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node;
}
}
6. 独占Lock
6.1 独占方式获取lock主要流程
- 调用 tryAcquire 尝试性的获取锁(一般都是由子类实现), 成功的话直接返回
- tryAcquire 调用获取失败, 将当前的线程封装成 Node 加入到 Sync Queue 里面(调用addWaiter), 等待获取 signal 信号
- 调用 acquireQueued 进行自旋的方式获取锁(有可能会 repeatedly blocking and unblocking)
- 根据acquireQueued的返回值判断在获取lock的过程中是否被中断, 若被中断, 则自己再中断一下(selfInterrupt), 若是响应中断的则直接抛出异常
6.2 独占方式获取lock主要分成3类
- acquire 不响应中断的获取lock, 这里的不响应中断指的是线程被中断后会被唤醒, 并且继续获取lock,在方法返回时, 根据刚才的获取过程是否被中断来决定是否要自己中断一下(方法 selfInterrupt)
- doAcquireInterruptibly 响应中断的获取 lock, 这里的响应中断, 指在线程获取 lock 过程中若被中断, 则直接抛出异常
- doAcquireNanos 响应中断及超时的获取 lock, 当线程被中断, 或获取超时, 则直接抛出异常, 获取失败
6.3 独占的获取lock 方法 acquire
acquire(int arg):以独占模式获取对象,忽略中断。
public final void acquire(int arg){
if(!tryAcquire(arg)&&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
}
- 调用 tryAcquire 尝试性的获取锁(一般都是又子类实现), 成功的话直接返回
- tryAcquire 调用获取失败, 将当前的线程封装成 Node 加入到 Sync Queue 里面(调用addWaiter), 等待获取 signal 信号
- 调用 acquireQueued 进行自旋的方式获取锁(有可能会 repeatedly blocking and unblocking)
- 根据acquireQueued的返回值判断在获取lock的过程中是否被中断, 若被中断, 则自己再中断一下(selfInterrupt)。
6.4 循环获取lock 方法 acquireQueued
final boolean acquireQueued(final Node node, int arg){
boolean failed = true;
try {
boolean interrupted = false;
for(;;){
final Node p = node.predecessor();
if(p == head && tryAcquire(arg)){
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
if(shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()){
interrupted = true;
}
}
}finally {
if(failed){
cancelAcquire(node);
}
}
}
主逻辑:
- 当前节点的前继节点是head节点时,先 tryAcquire获取一下锁, 成功的话设置新 head, 返回
- 第一步不成功, 检测是否需要sleep, 需要的话就sleep, 等待前继节点在释放lock时唤醒或通过中断来唤醒
- 整个过程可能需要blocking nonblocking 几次
6.5 支持中断获取lock 方法 doAcquireInterruptibly
private void doAcquireInterruptibly(int arg) throws InterruptedException{
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for(;;){
final Node p = node.predecessor();
if(p == head && tryAcquire(arg)){
setHead(node);
p.next = null;
failed = false;
return;
}
if(shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()){
throw new InterruptedException();
}
}
}finally {
if(failed){
cancelAcquire(node);
}
}
}
acquireInterruptibly(int arg): 以独占模式获取对象,如果被中断则中止。
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
通过先检查中断的状态,然后至少调用一次tryAcquire,返回成功。否则,线程在排队,不停地阻塞与唤醒,调用tryAcquire直到成功或者被中断。
6.6 超时&中断获取lock 方法
tryAcquireNanos(int arg, long nanosTimeout):独占且支持超时模式获取: 带有超时时间,如果经过超时时间则会退出。
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException{
if(nanosTimeout <= 0L){
return false;
}
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for(;;){
final Node p = node.predecessor();
if(p == head && tryAcquire(arg)){
setHead(node);
p.next = null;
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if(nanosTimeout <= 0L){
return false;
}
if(shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold){
LockSupport.parkNanos(this, nanosTimeout);
}
if(Thread.interrupted()){
throw new InterruptedException();
}
}
}finally {
if(failed){
cancelAcquire(node);
}
}
}
尝试以独占模式获取,如果中断和超时则放弃。实现时先检查中断的状态,然后至少调用一次tryAcquire。
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg)|| doAcquireNanos(arg, nanosTimeout);
}
6.7 释放lock方法
释放 lock 流程:
- 调用子类的 tryRelease 方法释放获取的资源
- 判断是否完全释放lock(这里有 lock 重复获取的情况)
- 判断是否有后继节点需要唤醒, 需要的话调用unparkSuccessor进行唤醒
public final boolean release(int arg){
if(tryRelease(arg)){
Node h = head;
if(h != null && h.waitStatus != 0){
unparkSuccessor(h);
}
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
logger.info("unparkSuccessor node:" + node + Thread.currentThread().getName());
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
logger.info("unparkSuccessor s:" + node + Thread.currentThread().getName());
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
logger.info("unparkSuccessor s:"+s);
if (s != null)
LockSupport.unpark(s.thread);
}
7. 共享Lock
7.1 共享方式获取lock流程
- 调用 tryAcquireShared 尝试性的获取锁(一般都是由子类实现), 成功的话直接返回
- tryAcquireShared 调用获取失败, 将当前的线程封装成 Node 加入到 Sync Queue 里面(调用addWaiter), 等待获取 signal 信号
- 在 Sync Queue 里面进行自旋的方式获取锁(有可能会 repeatedly blocking and unblocking
- 当获取失败, 则判断是否可以 block(block的前提是前继节点被打上 SIGNAL 标示)
- 共享与独占获取lock的区别主要在于 在共享方式下获取 lock 成功会判断是否需要继续唤醒下面的继续获取共享lock的节点(及方法 doReleaseShared)
7.2 共享方式获取lock主要分成3类
- acquireShared 不响应中断的获取lock, 这里的不响应中断指的是线程被中断后会被唤醒, 并且继续获取lock,在方法返回时, 根据刚才的获取过程是否被中断来决定是否要自己中断一下(方法 selfInterrupt)
- doAcquireSharedInterruptibly 响应中断的获取 lock, 这里的响应中断, 指在线程获取 lock 过程中若被中断, 则直接抛出异常
- doAcquireSharedNanos 响应中断及超时的获取 lock, 当线程被中断, 或获取超时, 则直接抛出异常, 获取失败
7.3 获取共享lock 方法 acquireShared
public final void acquireShared(int arg){
if(tryAcquireShared(arg) < 0){
doAcquireShared(arg);
}
}
7.4 获取共享lock 方法 doAcquireShared
private void doAcquireShared(int arg){
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for(;;){
final Node p = node.predecessor();
if(p == head){
int r = tryAcquireShared(arg);
if(r >= 0){
setHeadAndPropagate(node, r);
p.next = null;
if(interrupted){
selfInterrupt();
}
failed = false;
return;
}
}
if(shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()){
interrupted = true;
}
}
}finally {
if(failed){
cancelAcquire(node);
}
}
}
7.5 获取共享lock 方法 doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException{
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for(;;){
final Node p = node.predecessor();
if(p == head){
int r = tryAcquireShared(arg);
if(r >= 0){
setHeadAndPropagate(node, r);
p.next = null;
failed = false;
return;
}
}
if(shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()){
throw new InterruptedException();
}
}
}finally {
if(failed){
cancelAcquire(node);
}
}
}
7.6 获取共享lock 方法 doAcquireSharedNanos
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException{
if (nanosTimeout <= 0L){
return false;
}
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for(;;){
final Node p = node.predecessor();
if(p == head){
int r = tryAcquireShared(arg);
if(r >= 0){
setHeadAndPropagate(node, r);
p.next = null;
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if(nanosTimeout <= 0L){
return false;
}
if(shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold){
LockSupport.parkNanos(this, nanosTimeout);
}
if(Thread.interrupted()){
throw new InterruptedException();
}
}
}finally {
if (failed){
cancelAcquire(node);
}
}
}
7.7 释放共享lock
当 Sync Queue中存在连续多个获取 共享lock的节点时, 会出现并发的唤醒后继节点(因为共享模式下获取lock后会唤醒近邻的后继节点来获取lock)。首先调用子类的 tryReleaseShared来进行释放 lock,然后判断是否需要唤醒后继节点来获取 lock
private void doReleaseShared(){
for(;;){
Node h = head;
if(h != null && h != tail){
int ws = h.waitStatus;
if(ws == Node.SIGNAL){
if(!compareAndSetWaitStatus(h, Node.SIGNAL, 0)){
continue;
}
unparkSuccessor(h);
}
else if(ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)){
continue;
}
}
if(h == head){
break;
}
}
}
8. 总结
本文主要讲过了抽象的队列式的同步器AQS的主要方法和实现原理。分别介绍了Node、Condition Queue、 Sync Queue、独占获取释放lock、共享获取释放lock的具体源码实现。AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它。
订阅最新文章,欢迎关注我的公众号
参考
- Java并发之AQS详解
- AbstractQueuedSynchronizer 源码分析 (基于Java 8)