11. AQS
AQS,AbstractQueuedSynchronizer
,抽象队列同步器,是一个抽象类。
- 是用来构建锁和同步器的框架。基于 AQS 实现的锁:
ReentrantLock
ReentrantReadWriteLock
CountDownLatch
还有一个类似的抽象类,AbstractQueuedLongSynchronizer
,是把 state
变成了 long
类型。能支持更多的业务资源数量。
private volatile long state;
10.0 继承关系
AQS 继承了 AbstractOwnableSynchronizer
AOS,该类用于指明锁与持有锁的线程之间的关系是独占模式。
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 3737899427754241961L;
protected AbstractOwnableSynchronizer() { }
// 独占锁的持有者线程
private transient Thread exclusiveOwnerThread;
// set, get 方法
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
10.1 源码分析
AQS 的 CLS 队列中的结点为拥有线程的结点 Node
。
- 头结点
head
指向的结点就是当前占有共享资源线程对应的结点
10.1.1 state 锁资源状态
state
用来标识锁资源的状态。
volatile
修饰,代表对其的操作是原子操作
AQS 使用 state
来表示同步状态,通过内置的 FIFO 队列来完成线程排队,并通过 CAS 操作来修改 state
的值。
private volatile int state;
// 获取
protected final int getState() {
return state;
}
// 更新
protected final void setState(int newState) {
state = newState;
}
// 使用 CAS 操作更新:只有在旧值等于 expect 时才会更新
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
10.1.2 Node 内部类
AQS 的内部类 Node
提供了同步资源的共享模式:
- 独占模式 exclusive:
ReentrantLock
- 共享模式 share:
Semaphore
,CountDownLatch
可以同时实现两种模式:ReentrantReadWriteLock
同步队列与等待队列
通过 Node
实现了两种队列:
- 通过
prev
和next
实现 线程同步双向队列 (CLH 队列) - 通过
nextWaiter
实现 线程等待单向队列- 线程等待队列有可以有多个,由
Condition
对象维护。
- 线程等待队列有可以有多个,由
同步队列存放着正在竞争同步资源的线程的引用,等待队列则存放着待唤醒的线程的引用。
- 当线程获取同步状态失败时,同步器会将当前线程构造成 Node 结点放入同步队列中,同步队列的首节点即为正在持有同步状态的线程结点。
- 每个
Condition
对象都对应一个等待队列。
static final class Node {
// 标记一个线程结点在共享模式下等待
static final Node SHARED = new Node();
// 标记一个线程结点在独占模式下等待,值为 null
static final Node EXCLUSIVE = null;
// 下面的常量都是 waitStatus 的各种状态:
// 表示该线程结点已释放
static final int CANCELLED = 1;
// 表示当该节点的前驱结点释放锁时,当前线程结点可以去获取锁
static final int SIGNAL = -1;
/**
* 表示该线程在 condition 队列中阻塞(Condition有使用),
* 当其他线程调用了 Condition 的 signal() 方法后,CONDITION 状态的结点将从
* 等待队列转移到同步队列中,等待获取同步锁。
*/
static final int CONDITION = -2;
// 与共享模式有关,表示有资源可用,新head结点需要继续唤醒后继结点
static final int PROPAGATE = -3;
/**
* waitStatus:等待状态,指的是当前 Node 节点中存放的线程的等待状态,
* 等待状态值就是上面的四个状态值:CANCELLED、SIGNAL、CONDITION、PROPAGATE
*/
volatile int waitStatus;
volatile Node prev; // 前驱结点
volatile Node next; // 后继结点
volatile Thread thread; // 结点对应的被阻塞线程的引用
// 等待队列里下一个等待条件的结点,当前结点与下一个节点的所属模式是相同的(共享模式,独占模式)
Node nextWaiter;
// 判断结点是否为共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
// 构造器,线程竞争锁失败时,创建新的结点,addWaiter() 会用到
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
10.1.3 主要方法
AQS 基于模板方法模式。子类需要实现的方法:
isHeldExclusively()
:该线程是否正在独占资源。只有用到condition才需要去实现它。tryAcquire(int)
:独占方式。尝试获取资源,成功则返回true,失败则返回false。tryRelease(int)
:独占方式。尝试释放资源,成功则返回true,失败则返回false。tryAcquireShared(int)
:共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。tryReleaseShared(int)
:共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
获取资源 acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先 tryAcquire(arg)
尝试获取,如果失败了则 addWaiter(Node.EXCLUSIVE)
把线程插入到等待队列的尾部。
addWaiter(Node.EXCLUSIVE)
通过 CAS 操作来保证线程安全
private Node addWaiter(Node mode) {
// 生成该线程对应的Node节点
Node node = new Node(Thread.currentThread(), mode);
// 将Node插入队列中
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 使用CAS尝试,如果成功就返回
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果等待队列为空或者上述CAS失败,再自旋CAS插入
enq(node);
return node;
}
// 自旋CAS插入等待队列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
然后在队列中尝试获取资源。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋
for (;;) {
final Node p = node.predecessor();
// 如果node的前驱结点p是head,表示node是第二个结点,就可以尝试去获取资源了
if (p == head && tryAcquire(arg)) {
// 拿到资源后,将head指向该结点。
// 所以head所指的结点,就是当前获取到资源的那个结点或null。
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果尝试获取资源失败,则线程挂起,直到被unpark()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
AQS 尝试获取同步资源的流程图:
资源的释放
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
// 如果状态是负数,尝试把它设置为0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 得到头结点的后继结点head.next
Node s = node.next;
// 如果这个后继结点为空或者状态大于0
// 通过前面的定义我们知道,大于0只有一种可能,就是这个结点已被取消
if (s == null || s.waitStatus > 0) {
s = null;
// 等待队列中所有还有用的结点,都向前移动
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果后继结点不为空,则 unpark() 这个线程,得到锁资源
if (s != null)
LockSupport.unpark(s.thread);
}
10.1.4 ConditionObject 条件对象
Condition
对象是通过 Lock
接口的 newCondition()
来创建的。如果 Lock
的实现类是 ReentrantLock
,则 Condition
对象是通过调用 ReentrantLock
中 Sync
锁(该锁实现了 AQS 抽象类)的 newCondition()
方法来创建的,底层是创建了 AQS 中的 ConditionObject
对象。
ConditionObject
对象封装了线程等待单向队列的头尾指针。每个 ConditionObject
对象都对应一个线程等待单向队列。
- 队列中的元素是 AQS 的
Node
对象,每个Node
对应一个等待共享资源的线程,该单向队列由nextWaiter
指针来维护。
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
// 条件队列的头
private transient Node firstWaiter;
// 条件队列的尾
private transient Node lastWaiter;
// other methods...
}