AQS


11. AQS

AQS,AbstractQueuedSynchronizer,抽象队列同步器,是一个抽象类。

  • 是用来构建锁和同步器的框架。基于 AQS 实现的锁:
    • ReentrantLock
    • ReentrantReadWriteLock
    • CountDownLatch

还有一个类似的抽象类,AbstractQueuedLongSynchronizer,是把 state 变成了 long 类型。能支持更多的业务资源数量。

private volatile long state;

10.0 继承关系

AQS 继承了 AbstractOwnableSynchronizer AOS,该类用于指明锁与持有锁的线程之间的关系是独占模式

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {

    private static final long serialVersionUID = 3737899427754241961L;

    protected AbstractOwnableSynchronizer() { }

    // 独占锁的持有者线程
    private transient Thread exclusiveOwnerThread;

    // set, get 方法
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

10.1 源码分析

AQS 的 CLS 队列中的结点为拥有线程的结点 Node

  • 头结点 head 指向的结点就是当前占有共享资源线程对应的结点

10.1.1 state 锁资源状态

state 用来标识锁资源的状态。

  • volatile 修饰,代表对其的操作是原子操作

AQS 使用 state 来表示同步状态,通过内置的 FIFO 队列来完成线程排队,并通过 CAS 操作来修改 state 的值。

private volatile int state;

// 获取
protected final int getState() {
    return state;
}
// 更新
protected final void setState(int newState) {
    state = newState;
}
// 使用 CAS 操作更新:只有在旧值等于 expect 时才会更新
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

10.1.2 Node 内部类

AQS 的内部类 Node 提供了同步资源的共享模式:

  • 独占模式 exclusiveReentrantLock
  • 共享模式 share: Semaphore, CountDownLatch

可以同时实现两种模式:ReentrantReadWriteLock

同步队列与等待队列

通过 Node 实现了两种队列:

  1. 通过 prevnext 实现 线程同步双向队列 (CLH 队列)
  2. 通过 nextWaiter 实现 线程等待单向队列
    • 线程等待队列有可以有多个,由 Condition 对象维护。

同步队列存放着正在竞争同步资源的线程的引用,等待队列则存放着待唤醒的线程的引用。

  • 当线程获取同步状态失败时,同步器会将当前线程构造成 Node 结点放入同步队列中,同步队列的首节点即为正在持有同步状态的线程结点。
  • 每个 Condition 对象都对应一个等待队列。

static final class Node {
    // 标记一个线程结点在共享模式下等待
    static final Node SHARED = new Node();
    // 标记一个线程结点在独占模式下等待,值为 null
    static final Node EXCLUSIVE = null; 

    // 下面的常量都是 waitStatus 的各种状态:

    // 表示该线程结点已释放
    static final int CANCELLED = 1; 

    // 表示当该节点的前驱结点释放锁时,当前线程结点可以去获取锁
    static final int SIGNAL = -1;

    /**
     * 表示该线程在 condition 队列中阻塞(Condition有使用),
     * 当其他线程调用了 Condition 的 signal() 方法后,CONDITION 状态的结点将从
     * 等待队列转移到同步队列中,等待获取同步锁。
     */ 
    static final int CONDITION = -2;

    // 与共享模式有关,表示有资源可用,新head结点需要继续唤醒后继结点
    static final int PROPAGATE = -3;

    /**
     * waitStatus:等待状态,指的是当前 Node 节点中存放的线程的等待状态,
     * 等待状态值就是上面的四个状态值:CANCELLED、SIGNAL、CONDITION、PROPAGATE
     */
    volatile int waitStatus;

    volatile Node prev; // 前驱结点
    volatile Node next; // 后继结点

    volatile Thread thread; // 结点对应的被阻塞线程的引用

    // 等待队列里下一个等待条件的结点,当前结点与下一个节点的所属模式是相同的(共享模式,独占模式)
    Node nextWaiter;


    // 判断结点是否为共享模式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    // 构造器,线程竞争锁失败时,创建新的结点,addWaiter() 会用到
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

10.1.3 主要方法

AQS 基于模板方法模式。子类需要实现的方法:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
获取资源 acquire
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

        selfInterrupt();
}

首先 tryAcquire(arg) 尝试获取,如果失败了则 addWaiter(Node.EXCLUSIVE) 把线程插入到等待队列的尾部。

  • addWaiter(Node.EXCLUSIVE) 通过 CAS 操作来保证线程安全
private Node addWaiter(Node mode) {
    // 生成该线程对应的Node节点
    Node node = new Node(Thread.currentThread(), mode);
    // 将Node插入队列中
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        // 使用CAS尝试,如果成功就返回
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果等待队列为空或者上述CAS失败,再自旋CAS插入
    enq(node);
    return node;
}

// 自旋CAS插入等待队列
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

然后在队列中尝试获取资源。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 自旋
        for (;;) {
            final Node p = node.predecessor();
            // 如果node的前驱结点p是head,表示node是第二个结点,就可以尝试去获取资源了
            if (p == head && tryAcquire(arg)) {
                // 拿到资源后,将head指向该结点。
                // 所以head所指的结点,就是当前获取到资源的那个结点或null。
                setHead(node); 
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 如果尝试获取资源失败,则线程挂起,直到被unpark()
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

AQS 尝试获取同步资源的流程图:

资源的释放
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    // 如果状态是负数,尝试把它设置为0
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 得到头结点的后继结点head.next
    Node s = node.next;
    // 如果这个后继结点为空或者状态大于0
    // 通过前面的定义我们知道,大于0只有一种可能,就是这个结点已被取消
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 等待队列中所有还有用的结点,都向前移动
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果后继结点不为空,则 unpark() 这个线程,得到锁资源
    if (s != null)
        LockSupport.unpark(s.thread);
}

10.1.4 ConditionObject 条件对象

Condition 对象是通过 Lock 接口的 newCondition() 来创建的。如果 Lock 的实现类是 ReentrantLock,则 Condition 对象是通过调用 ReentrantLockSync 锁(该锁实现了 AQS 抽象类)的 newCondition() 方法来创建的,底层是创建了 AQS 中的 ConditionObject 对象。

ConditionObject 对象封装了线程等待单向队列的头尾指针。每个 ConditionObject 对象都对应一个线程等待单向队列。

  • 队列中的元素是 AQS 的 Node 对象,每个 Node 对应一个等待共享资源的线程,该单向队列由 nextWaiter 指针来维护。
public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    // 条件队列的头
    private transient Node firstWaiter;
    // 条件队列的尾
    private transient Node lastWaiter;

    // other methods...   
}

文章作者: Yu Yang
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Yu Yang !
 上一篇
8.volatile 关键字 8.volatile 关键字
8. volatile8.0 为什么存在内存可见性问题 -> 缓存一致性问题可见性是指某线程修改了一个变量的值,新值对于其他线程是立即可见的。 为了提高代码执行速度,CPU 先把资源从内存加载到 CPU 缓存中(L1, L2等),但操
下一篇 
进程与线程 进程与线程
0. 进程与线程进程是应用程序在某块独立内存上的运行活动,是系统进行资源分配的基本单位。各个进程之间互不干扰,包括程序段、数据、PCB。 程序段:程序运行的代码; 数据段:程序运行期的相关数据; PCB 进程控制块:描述进程的基本情况和运
  目录