本文参考Java3y公众号以及《Java并发编程艺术》
AQS介绍
抽象队列同步器,是JUC包中最重要的类,是并发工具类的实现基础,可重入锁、读写锁、CountDownLatch、信号量、ThreadPoolExecutor,到处都有它的身影。
来看看Java3y大佬对备注的分析:
通读了一遍,可以总结出以下比较关键的信息:
- AQS其实就是一个可以给我们实现锁的框架
- 内部实现的关键是:先进先出的队列、state状态
- 定义了内部类ConditionObject
- 拥有两种线程模式
- 独占模式
- 共享模式
- 在LOCK包中的相关锁(常用的有ReentrantLock、 ReadWriteLock)都是基于AQS来构建
- 一般我们叫AQS为同步器
重要属性
同步状态state及相关方法
/** * The synchronization state. */ private volatile int state;//用volatile修饰保证其对各个线程的可见性。 /** * Returns the current value of synchronization state. * This operation has memory semantics of a {@code volatile} read. * @return current state value */ protected final int getState() { return state; } /** * Sets the value of synchronization state. * This operation has memory semantics of a {@code volatile} write. * @param newState the new state value */ protected final void setState(int newState) { state = newState; } /** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a {@code volatile} read * and write. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that the actual * value was not equal to the expected value. */ protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update);//使用cas方法修改state。 }
FIFO队列/CLH队列
其实注释中对这个CLH队列描述的很详细,
* <p>The wait queue is a variant of a "CLH" (Craig, Landin, and * Hagersten) lock queue. CLH locks are normally used for * spinlocks. We instead use them for blocking synchronizers, but * use the same basic tactic of holding some of the control * information about a thread in the predecessor of its node. A * "status" field in each node keeps track of whether a thread * should block. A node is signalled when its predecessor * releases. Each node of the queue otherwise serves as a * specific-notification-style monitor holding a single waiting * thread. The status field does NOT control whether threads are * granted locks etc though. A thread may try to acquire if it is * first in the queue. But being first does not guarantee success; * it only gives the right to contend. So the currently released * contender thread may need to rewait. * * <p>To enqueue into a CLH lock, you atomically splice it in as new * tail. To dequeue, you just set the head field. * <pre> * +------+ prev +-----+ +-----+ * head | | <---- | | <---- | | tail * +------+ +-----+ +-----+ * </pre>
大意就是说AQS中的等待队列是一个CLH锁队列的变形。CLH锁队列一般用于自旋锁。队列中的节点会有一个status用于跟踪一个线程是否应该被阻塞。
我们来看看Node源码:
static final class Node { // 共享 static final Node SHARED = new Node(); // 独占 static final Node EXCLUSIVE = null; // 线程被取消了 static final int CANCELLED = 1; // 后继线程需要唤醒 static final int SIGNAL = -1; // 等待condition唤醒 static final int CONDITION = -2; // 共享式同步状态获取将会无条件地传播下去(没看懂) static final int PROPAGATE = -3; // 初始为0,状态是上面的几种 volatile int waitStatus; // 前置节点 volatile Node prev; // 后继节点 volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
acquire方法
获取独占锁的过程是在acquire定义的,该方法用到了模板设计模式,由子类实现。
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
注释大意是说这个方法可以用来实现lock方法,继承AQS需要你自己实现子类的tryAcquire方法,否则使用的时候会抛出异常throw new UnsupportedOperationException();
。
上述代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的工作,逻辑如下:
AQS的模板方法acquire通过调用子类自定义实现的tryAcquire获取同步状态失败后->将线程构造成Node节点(addWaiter)->将Node节点添加到同步队列对尾(addWaiter)->节点以自旋的方法获取同步状态(acquireQueued)。在节点自旋获取同步状态时,只有其前驱节点是头节点时才会尝试获取同步状态,如果该节点的前驱不是头节点或者该节点的前驱节点是头节点且获取同步状态失败,则判断当前线程需要阻塞,如果需要阻塞则需要被唤醒过后才返回。如果尝试均失败,会自己中断。
在AQS中,addWaiter方法先会尝试快速在尾部添加节点,如果不可以,就会使用enq方法,该方法会死循环,调用compareAndSetTail方法,来确保正确的在尾部添加节点。
源码如下:
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false;//中断标记 for (;;) { //自旋开始 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {//若node的前驱结点是头节点,就尝试获取同步状态 setHead(node); p.next = null; // help GC failed = false; //fail标记设置为false,表示获取成功 return interrupted; } // p != head 或者 p == head但是tryAcquire失败了,那么 // 应该阻塞当前线程等待前继唤醒。阻塞之前会再重试一次,还需要设置前继的waitStaus为SIGNAL。 // 线程会阻塞在parkAndCheckInterrupt方法中。 // parkAndCheckInterrupt返回可能是前继unpark或线程被中断。 if (shouldParkAfterFailedAcquire(p, node) && // 检查获取同步状态失败的节点的状态,如果线程应该阻塞则返回true parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } /** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) {//尝试使用cas快速添加节点 pred.next = node; return node; } } enq(node); return node; } /** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) {//死循环,会一直尝试添加节点直到成功添加为止。 Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
release方法
释放独占锁的过程是在release定义的,该方法同样用到了模板设计模式,由子类实现。
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
首先调用子类的tryRelease()方法释放锁,然后唤醒后继节点,在唤醒的过程中,需要判断后继节点是否满足情况,如果后继节点不为且不是作废状态,则唤醒这个后继节点,否则从tail节点向前寻找合适的节点,如果找到,则唤醒。
Condition内部类
(待补充)
ReentrantLock 可重入锁实现
根据源码备注的介绍,对可重入锁有以下的认识:
与synchronized具有相同的功能和语义,但具有更好的扩展性;
可以通过isHeldByCurrentThread()方法判断当前线程是否拥有锁;
支持公平锁,但会降低吞吐量;
不计时的tryLock()方法不符合公平锁的设定;
重要变量state,代表当前锁的状态,0表示没有线程获取锁,1表示被获取过,>1表示处于可重入状态(被重入了多少次)。
请在try代码块前调用lock,在finally块中unlock,这是推荐使用方式:
class X { private final ReentrantLock lock = new ReentrantLock(); // ... public void m() { lock.lock(); // block until condition holds try { // ... method body } finally { lock.unlock() } } }
内部类Sync、NonfairSync、FairSync
三者的继承关系为:
构造方法
无参构造函数默认会构造一个非公平重入锁,你也可以通过传参true或false来指定创建一个公平或非公平锁。
/** * Creates an instance of {@code ReentrantLock}. * This is equivalent to using {@code ReentrantLock(false)}. */ public ReentrantLock() { sync = new NonfairSync(); } /** * Creates an instance of {@code ReentrantLock} with the * given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
非公平lock
/** * Acquires the lock. * * <p>Acquires the lock if it is not held by another thread and returns * immediately, setting the lock hold count to one. * * <p>If the current thread already holds the lock then the hold * count is incremented by one and the method returns immediately. * * <p>If the lock is held by another thread then the * current thread becomes disabled for thread scheduling * purposes and lies dormant until the lock has been acquired, * at which time the lock hold count is set to one. */ public void lock() { sync.lock(); }
重入锁的lock方法使用sync.lock()作为代理实现的。而Sync类中的lock方法为抽象方法,也就是说lock方法的实现是交给NonfairSync、FairSync实现的。公平不公平,都交给子类实现了。NonfairSync的lock方法源码如下:
//NonfairSync的lock方法 /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1))//尝试使用cas操作获取锁 setExclusiveOwnerThread(Thread.currentThread());//获取成功,那么将当前线程设置为锁的拥有者 else acquire(1);//若失败,调用AQS的acquire方法,acquire方法又会调用子类的tryAcquire方法。 }
若cas获取锁失败,会调用AQS的acquire方法,acquire方法又会调用子类NonfairSync的tryAcquire方法。NonfairSync的tryAcquire方法调用的是父类Sync的nonfairTryAcquire方法(有点绕哈),我们整理一个流程:AQS的acquire() --> NonfairSync的tryAcquire() --> Sync的nonfairTryAcquire(),怎么样,理解了吧。
源码如下:
//NonfairSync的tryAcquire方法 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } //Sync的nonfairTryAcquire方法 /** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) {//锁为空闲状态 if (compareAndSetState(0, acquires)) { //似乎和lock方法一样 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) {//当前线程已经获取了锁,那么锁是可重入的 int nextc = c + acquires;//暂存state加1 if (nextc < 0) // overflow溢出 throw new Error("Maximum lock count exceeded"); setState(nextc);//更新state return true; } return false; }
公平lock
FairLock的lock方法源码如下:
final void lock() { acquire(1); }
欸?好像有点不同,为什么不用cas先获取锁状态呢?废话!公平锁的获取异地那个是要加入CLH队列的,要不怎么能说是公平的,先来后到,简单的道理。
公平锁和非公平锁的区别仅在于在尝试获取同步状态前,要判断!hasQueuedPredecessors()
,如果该节点有前驱结点,显然是不能获取锁的,返回false。
/** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && //唯一区别,先要判断CLH队列中,该节点之前已经没有前驱结点了 compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
unlock方法
//重入锁中的unlcok方法 /** * Attempts to release this lock. * * <p>If the current thread is the holder of this lock then the hold * count is decremented. If the hold count is now zero then the lock * is released. If the current thread is not the holder of this * lock then {@link IllegalMonitorStateException} is thrown. * * @throws IllegalMonitorStateException if the current thread does not * hold this lock */ public void unlock() { sync.release(1); }
同样的,unlock方法也使用sync作为代理实现,调用的是父类AQS的release方法。从备注可以知道,如果当前线程是获取了锁的线程,那么获取的状态-1,如果可重入状态<0将会抛出异常。
在介绍AQS时我们说过release方法也使用了模板方法,会调用子类的tryRelease方法,Sync的tryRelease方法如下:
protected final boolean tryRelease(int releases) { int c = getState() - releases;//重入状态-1 if (Thread.currentThread() != getExclusiveOwnerThread())//当前线程并不是获取了锁的线程? throw new IllegalMonitorStateException();//抛出异常 boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); //如果c = 0 ,说明当前锁已经完全被释放了,设置获取锁的线程为null } setState(c);//更新state return free; }
ReentrantReadWriteLock可重入读写锁
从备注中我们可以得知:
- 读写锁是一种共享锁,允许多个线程读,但是一个线程写的时候会阻塞其他所有读写操作。
- 内部维护两个锁,一个读锁(支持重入的共享锁),一个写锁(支持重入的排他锁)。
- 支持公平性选择、重进入(读线程可再次获取读锁,写线程可同时获得两种锁)、锁降级(遵循获取锁的顺序,写锁可降级为读锁)。
- 读写锁实现了ReadWriteLock接口,这个接口中定义了两个锁:读锁和写锁,都是Lock类型的。
- 如果一个集合在初始化之后,读多写少,那么是适合读写锁的。
- 读锁不支持Condition,写锁支持;
- 支持公平和非公平。
读写锁的状态表示
/* * Read vs write count extraction constants and functions. * Lock state is logically divided into two unsigned shorts: * The lower one representing the exclusive (writer) lock hold count, * and the upper the shared (reader) hold count. */ static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); //0x00010000 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; //0x0000FFFF static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; //0x0000FFFF /** Returns the number of shared holds represented in count */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** Returns the number of exclusive holds represented in count */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
读写状态设计:通过按位切割使用,低16位代表写状态,高16位代表读状态。
假设当前同步状态为S,那么写状态就是S & 0x0000FFFF,读状态为S >>> 16。写状态+1时,S+1;读状态+1时,S+(1<<16)。
根据状态划分可以得出一个结论:S不等于0时,当写状态等于0,那么读状态大于0,即读锁已被获取。
写锁的获取
写锁是一个支持重入的排他锁。如果当前线程已经获得写锁,则增加写状态。若当前线程在获取写锁时,读锁已被获取(读状态不为0)或者该线程不是已经获得写锁的线程,则当前线程进入等待状态。
同步状态的获取仍然使用Sync内部类的实例sync代理实现。
//写锁的lock方法 public void lock() { sync.acquire(1); } //Sync的tryAcquire方法 protected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 当前线程必须是无锁状态才能获取写锁 * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 锁饱和,fail * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. * 该线程有资格才能获取写锁 */ Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) //状态划分结论,c不等于0时,当写状态w等于0,那么读状态一定大于0,即读锁已被获取 if (w == 0 || current != getExclusiveOwnerThread())//存在读锁,或者写锁已被获取且不是当前线程 return false; if (w + exclusiveCount(acquires) > MAX_COUNT) //锁饱和 throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; } if (writerShouldBlock() || 写锁的writerShouldBlock方法永远返回false !compareAndSetState(c, c + acquires)) //cas设置状态失败,那么返回false return false; setExclusiveOwnerThread(current); return true; }
写锁释放
不多说,仍然是代理。
//写锁的unlock方法 public void unlock() { sync.release(1); }
Sync类中的tryRelease方法:
/* * Note that tryRelease and tryAcquire can be called by * Conditions. So it is possible that their arguments contain * both read and write holds that are all released during a * condition wait and re-established in tryAcquire. */ protected final boolean tryRelease(int releases) { if (!isHeldExclusively())//当前线程是否是获取锁的线程 throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; //锁状态为0,释放读锁 if (free) setExclusiveOwnerThread(null); //设置锁获取线程为null setState(nextc);//更新state return free; }
读锁获取
读锁是一个支持重入的共享锁,能够被多个线程获取,再没有其他写线程访问(或者写状态为0)时,读锁总是会被成功获取。如果当前线程已经获得读锁,读锁状态+1。如果获取读锁时,写锁已被其他线程获取,则当前线程进入等待。JAVA6 后,为了统计当前线程获取读锁的状态,各自线程获取读锁的次数只能保存在ThreadLocal中。
和写锁不同,读锁的lock方法使用sync调用了AQS的acquireShared方法:
//读锁的lock方法 public void lock() { sync.acquireShared(1); } //AQS中的acquireShared方法 /** * Acquires in shared mode, ignoring interrupts. Implemented by * first invoking at least once {@link #tryAcquireShared}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquireShared} until success. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquireShared} but is otherwise uninterpreted * and can represent anything you like. */ public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } //AQS中的doAcquireShared方法 /** * Acquires in shared uninterruptible mode. * @param arg the acquire argument */ private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED);//加入CLH队列 boolean failed = true; try { boolean interrupted = false; for (;;) {//自旋 final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); //node的前驱结点是头节点,调用tryAcquire方法 if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } //AQS中的tryAcquireShared方法 protected final int tryAcquireShared(int unused) { /* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */ Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) //若存在写锁且获取写锁的线程不是当前线程 return -1; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { //读锁没有被阻塞,且读锁没有溢出,且cas更新状态成功 if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
锁降级
锁降级指的是写锁降级为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能成为锁降级。
锁降级指的是把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。
public class ProcessData { private static final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private static final Lock readLock = rwl.readLock(); private static final Lock writeLock = rwl.writeLock(); private volatile boolean update = false; public void processData() { readLock.lock(); if (!update) { //必须先释放读锁 readLock.unlock(); //锁降级从获取写锁开始 writeLock.lock(); try { if (!update) { //准备数据的流程(略) update = true; } readLock.lock(); } finally { writeLock.unlock(); } //降级完成 } try { //使用数据的流程 } finally { readLock.unlock(); } } }
上述示例中,当数据发生变更后,update变量(布尔类型且volatile修饰)被设置为false,此时所有访问processData()方法的线程都能够感知到变化,但只有一个线程能够获取到写锁,而其他线程会被阻塞在读锁和写锁的lock()方法上。当前程获取写锁完成数据准备之后,再获取读锁,随后释放写锁,完成锁降级。
锁降级中读锁的获取是否必要呢?答案是必要的。主要原因是保证数据的可见性,如果当前线程不获取读锁而是直接释放写锁,假设此刻另一个线程(记作线程T)获取了写锁并修改了数据,则当前线程无法感知线程T的数据更新。如果当前线程获取读锁,即遵循锁降级的步骤,则线程T将会被阻塞,直到当前线程使用数据并释放读锁之后,线程T才能获取写锁进行数据更新。
RentrantReadWriteLock不支持锁升级(把持读锁、获取写锁,最后释放读锁的过程)。原因也是保证数据可见性,如果读锁已被多个线程获取,其中任意线程成功获取了写锁并更新了数据,则其更新对其他获取到读锁的线程不可见。