AQS
AQS全称为AbstractQueuedSynchronizer;顾名思义就是一个抽象的(可以被继承复用),内存存在排队(竞争资源的对象排队)的同步器(对共享资源和线程进行同步管理)。
AQS的成员属性
status
private volatile int status
state作为竞争资源的标记位。多个线程想要去修改共享资源的时候,先来读status。
- 如果state显示目前共享资源空闲可以被获取,那么就可以原子的修改status成功后,表示当前进程占有资源,将status置为不可用,拒绝其他线程修改status。
- 通过将等待的线程加入到队列中,进行排队,队列头部的线程自旋地访问status,其他线程挂起,避免了大量线程自旋内耗。
head、tail
- 使用FIFO的双向链表实现对等待线程的管理;
- 其中的内部类Node,主要存储了线程对象 (thread)、节点在队列中的等待状态(waitStatus)、前后指针 (pre、next)等信息。
private transient volatile Node head;
private transient volatile Node tail;
补充:waitStatus状态
- 0:节点初始化默认值或者节点已经释放锁
- CANCELLED为1:当前节点获取锁的请求已经被取消了
- SIGNAL为-1:当前节点的后续节点需要被唤醒
- CONDITION为-2:当前节点正在等待某一个Condition对象
- PROPAGATE为-3:传递共享模式下锁释放状态
AQS的方法
acquire、tryAcquire
通过tryAcquire尝试获取锁、如果失败需要通过addWaiter方法当前线程封装成Node,加入队列中,队列的head之后的一个节点在自旋CAS获取锁,其他线程都被挂起。挂起的线程等待执行release()释放锁资源。
// tryAcquire 尝试获取锁
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// acquire
public final void acquire int(arg) {
if(!tryAcquire(arg) && acquireQueued(addWaiter( Node.EXCLUSIVE), arg)) selfInterrupt();
}
acquiredQueued
- 保证head之后的一个节点在自旋CAS获取锁,其他线程都已经被挂起或者正在被挂起。
addWaiter
将当前线程封装成Node,加入到等待队列中,需要将Node插入到队尾,会通过CAS操作将当前节点置为尾结点;
- 如果失败,说明尾结点已经被其他线程修改了,调用enq方法,不断重试的将节点插入到队列中。
- 如果成功,当前节点成为尾结点,注意两个节点建立连接的时候,首先是后节点的pre指向前节点,当后节点成为尾结点后,前节点的next才会指向后节点。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if(pred != null) {
node.prev = pred;
if(compareAndSetTail pred, node)){
pred.next = node;
return node;
}
}
enq(node);
return node;
}
enq
- 如果队列为初始化,那么就尝试初始化;
- 如果尾插节点失败,那么就不断重试,直到插入成功为止。
private Node enq(final Node node) {
for(;;) {
Node t = tail;
if(t == null) { // Must initialize
if(compareAndSetHead(new Node()))
tail = head;
}else {
node.prev = t;
if(compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
release
- 通过tryRelease尝试释放锁,判断head节点的waitStatus,如果不为0,将其通过unparkSuccessor方法,通过CAS将其置为0,表示锁已释放;
- 接下来就要唤醒等待队列里的其他节点,从后往前遍历搜索,找到除了head外最靠前且非CANCELED状态的Node,对其进行唤醒,尝试获取锁。
public final boolean release(int arg) {
if(tryRelease(arg)) {
Node h = head;
if(h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
\\ tryRelease
protected boolean tryRelease(int arg) {
throw new UNsupportedOperationException();
}
unparkSuccessor
为什么不直接从头开始搜索,而是从后往前搜索?
- 这里与addWaiter方法中,前后两个节点建立连接的顺序有关:
- 后节点的pre指向前节点
- 前节点的next才会指向后节点
这两步操作在多线程环境下并不是原子的,也就是说如果从前往后搜索,那么可能前节点next还未建立好,搜索会可能会中断。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if(ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if(s == null || s.waitStatus > 0) {
s == null;
for(Node t = tail; t != null && t != node; t = t.prev)
if(t.waitStatus <= 0)
s = t;
}
if(s != null)
LockSupport.unpark(s.thread);
常用的辅助类
CountDownLatch
- 相当于一个减法计数器,每执行一条线程执行计数减一,之后等待计数器归零才向下执行。也就是指定线程个数执行完毕再执行操作。
- countDownLatch.countDown(): 计数减一
- countDownLatch.await(): 等待计数器归零才向下执行。
简单的测试代码:
CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "-> GO out");
// 计数减一
countDownLatch.countDown();
},String.valueOf(i)).start();
}
// 等待计数器归零,之后再向下执行
countDownLatch.await();
System.out.println("关门");
实际的项目代码:
// 创建计数器
private CountDownLatch countDownLatch;
/***
* @author SHshuo
* 对外删除数据的接口
* @return
*/
public void deleteNode(List<SubordinateUser> subordinateUser, String enterpriseCode) throws InterruptedException {
// 创建计数器
CountDownLatch countDownLatch = new CountDownLatch(subordinateUser.size());
// 付给全局变量
this.countDownLatch = countDownLatch;
if (subordinateUser.size() == 0) {
throw new NullPointerException();
} else {
// 彻底摒弃stream开启线程的同步方法。
// 这里的多线程异步执行、parallelStream会根据核心线程自动启动多个线程实现异步调用
subordinateUser.parallelStream().forEach(p -> {
deleteQuerySubordinateUser(p.getAccount(), enterpriseCode);
});
}
// 等待所有线程执行完毕后返回,从而防止脏数据
countDownLatch.await();
}
/***
* @author SHshuo
* 删除节点
* @return
*/
@Async
public void deleteQuerySubordinateUser(String account, String enterpriseCode) {
deleteLock.lock();
try {
System.out.println("正在执行" + Thread.currentThread().getName() + "线程");
业务逻辑省略
} catch (Exception e) {
e.printStackTrace();
} finally {
deleteLock.unlock();
countDownLatch.countDown();
System.out.println("释放" + Thread.currentThread().getName() + "线程");
}
}
CyclicBarrier
- 相当于加法计数器,执行一条线程之后等待,一直累积到自定义的线程数量才会接着执行。也就是达到指定线程个数再执行操作。
- new CyclicBarrier(5,()-> System.out.println("关门"));累加一直到有5个线程,才会执行()-> System.out.println("关门")语句;
- cyclicBarrier.await(): 等待
Semaphore
- 信号量,相当于一个容器,用来限制线程的数量。同一时间只能有指定数量个得到线程。常用在限流、共享资源互斥使用的场景中。
- semaphore.acquire():获得资源,可能会造成阻塞。
- semaphore.release():释放资源。
- Semaphore这个概念很像同步队列的意思