AQS
AQS全称为AbstractQueuedSynchronizer;顾名思义就是一个抽象的(可以被继承复用),内存存在排队(竞争资源的对象排队)的同步器(对共享资源和线程进行同步管理)。
AQS的成员属性
status
private volatile int status
state作为竞争资源的标记位。多个线程想要去修改共享资源的时候,先来读status。
- 如果state显示目前共享资源空闲可以被获取,那么就可以原子的修改status成功后,表示当前进程占有资源,将status置为不可用,拒绝其他线程修改status。
- 通过将等待的线程加入到队列中,进行排队,队列头部的线程自旋地访问status,其他线程挂起,避免了大量线程自旋内耗。
head、tail
- 使用FIFO的双向链表实现对等待线程的管理;
- 其中的内部类Node,主要存储了线程对象 (thread)、节点在队列中的等待状态(waitStatus)、前后指针 (pre、next)等信息。
private transient volatile Node head;
private transient volatile Node tail;
补充:waitStatus状态
- 0:节点初始化默认值或者节点已经释放锁
- CANCELLED为1:当前节点获取锁的请求已经被取消了
- SIGNAL为-1:当前节点的后续节点需要被唤醒
- CONDITION为-2:当前节点正在等待某一个Condition对象
- PROPAGATE为-3:传递共享模式下锁释放状态
AQS的方法
acquire、tryAcquire
通过tryAcquire尝试获取锁、如果失败需要通过addWaiter方法当前线程封装成Node,加入队列中,队列的head之后的一个节点在自旋CAS获取锁,其他线程都被挂起。挂起的线程等待执行release()释放锁资源。
// tryAcquire 尝试获取锁
protected boolean tryAcquire(int arg) {
  throw new UnsupportedOperationException();
}
// acquire
public final void acquire int(arg) {
  if(!tryAcquire(arg) && acquireQueued(addWaiter( Node.EXCLUSIVE), arg)) selfInterrupt();
}
acquiredQueued
- 保证head之后的一个节点在自旋CAS获取锁,其他线程都已经被挂起或者正在被挂起。
addWaiter
将当前线程封装成Node,加入到等待队列中,需要将Node插入到队尾,会通过CAS操作将当前节点置为尾结点;
- 如果失败,说明尾结点已经被其他线程修改了,调用enq方法,不断重试的将节点插入到队列中。
- 如果成功,当前节点成为尾结点,注意两个节点建立连接的时候,首先是后节点的pre指向前节点,当后节点成为尾结点后,前节点的next才会指向后节点。
private Node addWaiter(Node mode) {
  Node node = new Node(Thread.currentThread(), mode);
  // Try the fast path of enq; backup to full enq on failure
  Node pred = tail;
  if(pred != null) {
    node.prev = pred;
    if(compareAndSetTail pred, node)){
      pred.next = node;
      return node;
    }
  }
  enq(node);
  return node;
}
enq
- 如果队列为初始化,那么就尝试初始化;
- 如果尾插节点失败,那么就不断重试,直到插入成功为止。
private Node enq(final Node node) {
  for(;;) {
    Node t = tail;
    if(t == null) { // Must initialize
      if(compareAndSetHead(new Node()))
        tail = head;
    }else {
      node.prev = t;
      if(compareAndSetTail(t, node)) {
        t.next = node;
        return t;
      }
    }
  }
}
release
- 通过tryRelease尝试释放锁,判断head节点的waitStatus,如果不为0,将其通过unparkSuccessor方法,通过CAS将其置为0,表示锁已释放;
- 接下来就要唤醒等待队列里的其他节点,从后往前遍历搜索,找到除了head外最靠前且非CANCELED状态的Node,对其进行唤醒,尝试获取锁。
public final boolean release(int arg) {
  if(tryRelease(arg)) {
     Node h = head;
    if(h != null && h.waitStatus != 0)
      unparkSuccessor(h);
    return true;
  }
  return false;
}
\\ tryRelease
protected boolean tryRelease(int arg) {
    throw new UNsupportedOperationException();
    }
unparkSuccessor
为什么不直接从头开始搜索,而是从后往前搜索?
- 这里与addWaiter方法中,前后两个节点建立连接的顺序有关:
- 后节点的pre指向前节点
- 前节点的next才会指向后节点
这两步操作在多线程环境下并不是原子的,也就是说如果从前往后搜索,那么可能前节点next还未建立好,搜索会可能会中断。
private void unparkSuccessor(Node node) {
  /* 
  * If status is negative (i.e., possibly needing signal) try
  * to clear in anticipation of signalling. It is OK if this
  * fails or if status is changed by waiting thread.
  */
  int ws = node.waitStatus;
  if(ws < 0) {
    compareAndSetWaitStatus(node, ws, 0);
    
    /*
    * Thread to unpark is held in successor, which is normally
    * just the next node. But if cancelled or apparently null,
    * traverse backwards from tail to find the actual
    * non-cancelled successor.
    */
    Node s = node.next;
    if(s == null || s.waitStatus > 0) {
      s == null;
      for(Node t = tail; t != null && t != node; t = t.prev)
        if(t.waitStatus <= 0)
          s = t;
    }
    if(s != null)
      LockSupport.unpark(s.thread);
常用的辅助类
CountDownLatch
- 相当于一个减法计数器,每执行一条线程执行计数减一,之后等待计数器归零才向下执行。也就是指定线程个数执行完毕再执行操作。
- countDownLatch.countDown(): 计数减一
- countDownLatch.await(): 等待计数器归零才向下执行。
简单的测试代码:
CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
  new Thread(() -> {
  System.out.println(Thread.currentThread().getName() + "-> GO out");
//  计数减一
  countDownLatch.countDown();
  },String.valueOf(i)).start();
}
//  等待计数器归零,之后再向下执行
countDownLatch.await();
System.out.println("关门");
实际的项目代码:
//  创建计数器
private CountDownLatch countDownLatch;
/***
* @author SHshuo
* 对外删除数据的接口
* @return
*/
public void deleteNode(List<SubordinateUser> subordinateUser, String enterpriseCode) throws InterruptedException {
//  创建计数器
CountDownLatch countDownLatch = new CountDownLatch(subordinateUser.size());
//  付给全局变量
  this.countDownLatch = countDownLatch;
if (subordinateUser.size() == 0) {
  throw new NullPointerException();
} else {
//  彻底摒弃stream开启线程的同步方法。
//  这里的多线程异步执行、parallelStream会根据核心线程自动启动多个线程实现异步调用
subordinateUser.parallelStream().forEach(p -> {
  deleteQuerySubordinateUser(p.getAccount(), enterpriseCode);
  });
}
//  等待所有线程执行完毕后返回,从而防止脏数据
countDownLatch.await();
}
 /***
* @author SHshuo
* 删除节点
* @return
*/
@Async
public void deleteQuerySubordinateUser(String account, String enterpriseCode) {
deleteLock.lock();
try {
System.out.println("正在执行" + Thread.currentThread().getName() + "线程");
             业务逻辑省略                 
} catch (Exception e) {
  e.printStackTrace();
} finally {
  deleteLock.unlock();
  countDownLatch.countDown();
  System.out.println("释放" + Thread.currentThread().getName() + "线程");
  }
}
CyclicBarrier
- 相当于加法计数器,执行一条线程之后等待,一直累积到自定义的线程数量才会接着执行。也就是达到指定线程个数再执行操作。
- new CyclicBarrier(5,()-> System.out.println("关门"));累加一直到有5个线程,才会执行()-> System.out.println("关门")语句;
- cyclicBarrier.await(): 等待
Semaphore
- 信号量,相当于一个容器,用来限制线程的数量。同一时间只能有指定数量个得到线程。常用在限流、共享资源互斥使用的场景中。
- semaphore.acquire():获得资源,可能会造成阻塞。
- semaphore.release():释放资源。
- Semaphore这个概念很像同步队列的意思

 京公网安备 11010502036488号
京公网安备 11010502036488号