AQS简介
1.在同步组件的实现中,AQS是核心部分,同步组件的实现者通过使用AQS提供的模板方法实现同步组件语义。 2.AQS 则实现了对同步状态的管理,以及对阻塞线程进行排队,等待通知等等一些底层的实现处理。 3.AQS的核心也包括了这些方面:同步队列,独占式锁的获取和释放,共享锁的获取和释放以及可中断锁,超时等待锁获取这些特性的实现,而这些实际上则是AQS提供出来的模板方法。
LockSupport
LockSupport可以理解为一个工具类。它的作用很简单,就是挂起和继续执行线程。它的常用的API如下:
- public static void park() : 如果没有可用许可,则挂起当前线程
- public static void unpark(Thread thread):给thread一个可用的许可,让它得以继续执行
因为单词park的意思就是停车,因此这里park()函数就表示让线程暂停。反之,unpark()则表示让线程继续执行。
需要注意的是,LockSupport本身也是基于许可的实现,如何理解这句话呢,请看下面的代码:
大家可以猜一下,park()之后,当前线程是停止,还是 可以继续执行呢?
答案是:可以继续执行。那是因为在park()之前,先执行了unpark(),进而释放了一个许可,也就是说当前线程有一个可用的许可。而park()在有可用许可的情况下,是不会阻塞线程的。
综上所述,park()和unpark()的执行效果和它调用的先后顺序没有关系。这一点相当重要,因为在一个多线程的环境中,我们往往很难保证函数调用的先后顺序(都在不同的线程中并发执行),因此,这种基于许可的做法能够最大限度保证程序不出错。
与park()和unpark()相比, 一个典型的反面教材就是Thread.resume()和Thread.suspend()。
看下面的代码:
Thread.currentThread().resume();
Thread.currentThread().suspend();
LockSupport.unpark(Thread.currentThread());
LockSupport.park();
首先让线程继续执行,接着在挂起线程。这个写法和上面的park()的示例非常接近,但是运行结果却是截然不同的。在这里,当前线程就是卡死。
因此,使用park()和unpark()才是我们的首选。而在AbstractQueuedSynchronizer中,也正是使用了LockSupport的park()和unpark()操作来控制线程的运行状态的。
AQS的成员属性
/**
* Head of the wait queue, lazily initialized.
*/
// 同步队列是一个FIFO的双向链表
// 同步队列中的头节点
private transient volatile Node head;
/**
* Tail of the wait queue. After initialization, modified only via casTail.
*/
// 同步队列中的尾节点
private transient volatile Node tail;
/**
* The synchronization state.
*/
/*
用于判断共享资源是否正在被占用的标志位
volatile保证了线程之间的可见性
为什么用int为不是Boolean?
(boolea占用空间更小)
因为线程获取锁有两种模式:独占和共享
在共享模式下,可能有多个线程正在共享资源
*/
private volatile int state;
同步队列中Node节点
同步队列
当共享资源被某个线程占有,其他请求该资源的线程将会阻塞,从而进入同步队列。就数据结构而言,队列的实现方式无外乎两者:一是通过数组的形式,另外一种则是链表的形式。AQS中的同步队列则是通过链式方式进行实现。 接下来,很显然我们至少会抱有这样的疑问:1. 节点的数据结构是什么样的?2. 是单向还是双向?3. 是带头结点的还是不带头节点的?
在AQS有一个静态内部类Node,这是我们同步队列的每个具体节点。在这个类中有如下属性:
volatile int waitStatus; // 节点状态
volatile Node prev; // 当前节点的前驱节点
volatile Node next; // 当前节点的后继节点
volatile Thread thread; // 当前节点所包装的线程对象
Node nextWaiter; // 等待队列中的下一个节点
节点的状态如下:
int INITIAL = 0; // 初始状态 int CANCELLED = 1; // 当前节点从同步队列中取消 int SIGNAL = -1; // 后继节点的线程处于等待状态,如果当前节点释放同步状态会通知后继节点,使得后继 节点的线程继续运行。 int CONDITION = -2; // 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了 signal()方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中。 int PROPAGATE = -3; // 表示下一次共享式同步状态获取将会无条件地被传播下去。
现在我们知道了节点的数据结构类型,并且每个节点拥有其前驱和后继节点,很显然这是一个带头结点双向链表。
也就是说AQS实际上通过头尾指针来管理同步队列,同时实现包括获取锁失败的线程进行入队,释放锁时对同步队 列中的线程进行通知等核心方法。
独占锁的获取
调用lock()方法是获取独占锁,获取锁失败后调用AQS提供的acquire(int arg)模板方法将当前线程加入同步队列,成功则线程执行。来看ReentrantLock源码.
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
lock方法使用CAS来尝试将同步状态改为1,如果成功则将同步状态持有线程置为当前线程。否则将调用AQS提供的 acquire()方法。
public final void acquire(int arg) {
// 再次尝试获取同步状态,如果成功则方法直接返回
// 如果失败则先调用addWaiter()方法再调用acquireQueued()方法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire(arg):再次尝试获取同步状态,成功直接方法退出,失败调用addWaiter(); addWaiter(Node.EXCLUSIVE), arg):将当前线程以指定模式(独占式、共享式)封装为Node节点后置入同步队列.
private Node addWaiter(Node mode) {
// 将线程以指定模式封装为Node节点
Node node = new Node(Thread.currentThread(), mode);
// 获取当前队列的尾节点
Node pred = tail;
// 若尾节点不为空
if (pred != null) {
node.prev = pred;
// 使用CAS将当前节点尾插到同步队列中
if (compareAndSetTail(pred, node)) {
pred.next = node;
// CAS尾插成功,返回当前Node节点
return node;
}
}
// 尾节点为空 || CAS尾插失败
enq(node);
return node;
}
分析上面的注释。程序的逻辑主要分为两个部分:1. 当前同步队列的尾节点为null,调用方法enq()插入;2. 当前队列的尾节点不为null,则采用尾插入(compareAndSetTail()方法)的方式入队。另外还会有另外一个问题: 如果 if (compareAndSetTail(pred, node))为false怎么办?会继续执行到enq()方法,同时很明显compareAndSetTail() 是一个CAS操作,通常来说如果CAS操作失败会继续自旋(死循环)进行重试。
因此,经过我们这样的分析,enq()方法可能承担两个任务:
- 处理当前同步队列尾节点为null时进行入队操作;
- 如果CAS尾插入节点失败后负责自旋进行尝试
private Node enq(final Node node) {
// 直到将当前节点插入同步队列成功为止
for (;;) {
Node t = tail;
// 初始化同步队列
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 不断CAS将当前节点尾插入同步队列中
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
在上面的分析中我们可以看出在第1步中会先创建头结点,说明同步队列是带头结点的链式存储结构。带头结点与不带头结点相比,会在入队和出队的操作中获得更大的便捷性,因此同步队列选择了带头结点的链式存储结构。那么带头节点的队列初始化时机是什么?自然而然是在tail==null时,即当前线程是第一次插入同步队列。 compareAndSetTail(t, node)方法会利用CAS操作设置尾节点,如果CAS操作失败会在for (;;)死循环中不断尝试,直至成功return返回为止。
因此,对enq()方法可以做这样的总结:
-
在当前线程是第一个加入同步队列时,调用compareAndSetHead(new Node())方法,完成链式队列头结点的初始化;
-
自旋不断尝试CAS尾插入节点直至成功为止。
现在我们已经很清楚获取独占式锁失败的线程包装成Node然后插入同步队列的过程了。那么紧接着会有下一个问题:在同步队列中的节点(线程)会做什么事情来保证自己能够有机会获得独占式锁?
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 获取同步状态成功条件
// 前驱节点为头结点并且获取同步状态成功
if (p == head && tryAcquire(arg)) {
// 将当前节点设置为头结点
setHead(node);
// 删除原来的头结点
p.next = null; // help GC
failed = false;
return interrupted;
}
// shouldParkAfterFailedAcquire(p, node)判断当前节点是否需要挂起
// parkAndCheckInterrupt())通过调用native调用操作系统的原语将当前线程挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 获取失败将当前节点取消
if (failed)
cancelAcquire(node);
}
}
独占锁的释放(release()方法)
独占锁的释放调用unlock方法,而该方法实际调用了AQS的release方法.
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
}
这段代码逻辑就比较容易理解了,如果同步状态释放成功(tryRelease返回true)则会执行if块中的代码,当head指向的头结点不为null,并且该节点的状态值不为0的话才会执行unparkSuccessor()方法。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 头结点的后继节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 后继节点不为null时唤醒
LockSupport.unpark(s.thread);
}
首先获取头节点的后继节点,当后继节点不为空的时候会调用LookSupport.unpark()方法,该方法会唤醒该节点的后继节点所包装的线程。因此,每一次锁释放后就会唤醒队列中该节点的后继节点所引用的线程,从而进一步可以佐证获得锁的过程是一个FIFO(先进先出)的过程。
独占式锁获取与释放总结
- 线程获取锁失败,将线程调用addWaiter()封装成Node进行入队操作。addWaiter()中enq()方法完成对同步队列的头节点初始化以及CAS尾插失败后的重试处理。
- 入队之后排队获取锁的核心方法acquireQueued(),节点排队获取锁是一个自旋过程。当且仅当当前节点的前驱节点为头节点并且获取同步状态时,节点出队并且该节点引用的线程获取到锁。否则不满足条件时会不断自旋将前驱节点的状态置为SIGNAL后调用LockSupport.part()将当前线程阻塞。
- 释放锁时会唤醒后继结点(后继结点不为null)。
独占锁特性
可中断式获取锁
可响应中断式锁可调用方法lock.lockInterruptibly();而该方法其底层会调用AQS的acquireInterruptibly()方法。
/**
* Acquires in exclusive mode, aborting if interrupted.
* Implemented by first checking interrupt status, then invoking
* at least once {@link #tryAcquire}, returning on
* success. Otherwise the thread is queued, possibly repeatedly
* blocking and unblocking, invoking {@link #tryAcquire}
* until success or the thread is interrupted. This method can be
* used to implement method {@link Lock#lockInterruptibly}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
// 线程获取锁失败
doAcquireInterruptibly(arg);
}
在获取同步状态失败后就会调用doAcquireInterruptibly()方法
/**
* Acquires in exclusive interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
获取锁响应中断原理与acquire()几乎一样,唯一区别在于当parkAndCheckInterrupt()返回true时表示线程阻塞时被中断,抛出中断异常后线程退出。
超时等待式获取锁(tryAcquireNanos()方法)
通过调用lock.tryLock(timeout,TimeUnit)方式达到超时等待获取锁的效果,该方法会在三种情况下才会返回:
- 在超时时间内,当前线程成功获取了锁;
- 当前线程在超时时间内被中断;
- 超时时间结束,仍未获得锁返回false。
该方法会调用AQS的方法tryAcquireNanos(),源码为:
/**
* Attempts to acquire in exclusive mode, aborting if interrupted,
* and failing if the given timeout elapses. Implemented by first
* checking interrupt status, then invoking at least once {@link
* #tryAcquire}, returning on success. Otherwise, the thread is
* queued, possibly repeatedly blocking and unblocking, invoking
* {@link #tryAcquire} until success or the thread is interrupted
* or the timeout elapses. This method can be used to implement
* method {@link Lock#tryLock(long, TimeUnit)}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @param nanosTimeout the maximum number of nanoseconds to wait
* @return {@code true} if acquired; {@code false} if timed out
* @throws InterruptedException if the current thread is interrupted
*/
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
// 实现超时等待的效果
doAcquireNanos(arg, nanosTimeout);
}
最终是靠doAcquireNanos()方法实现超时等待的效果
/**
* Acquires in exclusive timed mode.
*
* @param arg the acquire argument
* @param nanosTimeout max wait time
* @return {@code true} if acquired
*/
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
超时获取锁逻辑与可中断获取锁基本一致,获取锁失败后,增加了一个时间处理。如果当前时间超过截止时间,线程不在等待,直接退出,返回false。否则将线程阻塞置为等待状态排队获取锁。