AQS

AQS全称为AbstractQueuedSynchronizer;顾名思义就是一个抽象的(可以被继承复用),内存存在排队(竞争资源的对象排队)的同步器(对共享资源和线程进行同步管理)。


AQS的成员属性

status

private volatile int status

state作为竞争资源的标记位。多个线程想要去修改共享资源的时候,先来读status。

  1. 如果state显示目前共享资源空闲可以被获取,那么就可以原子的修改status成功后,表示当前进程占有资源,将status置为不可用,拒绝其他线程修改status。
  2. 通过将等待的线程加入到队列中,进行排队,队列头部的线程自旋地访问status,其他线程挂起,避免了大量线程自旋内耗。

head、tail

  • 使用FIFO的双向链表实现对等待线程的管理;
  • 其中的内部类Node,主要存储了线程对象 (thread)、节点在队列中的等待状态(waitStatus)、前后指针 (pre、next)等信息。
private transient volatile Node head;
private transient volatile Node tail;

补充:waitStatus状态

  • 0:节点初始化默认值或者节点已经释放锁
  • CANCELLED为1:当前节点获取锁的请求已经被取消了
  • SIGNAL为-1:当前节点的后续节点需要被唤醒
  • CONDITION为-2:当前节点正在等待某一个Condition对象
  • PROPAGATE为-3:传递共享模式下锁释放状态


AQS的方法

acquire、tryAcquire

通过tryAcquire尝试获取锁、如果失败需要通过addWaiter方法当前线程封装成Node,加入队列中,队列的head之后的一个节点在自旋CAS获取锁,其他线程都被挂起。挂起的线程等待执行release()释放锁资源。

// tryAcquire 尝试获取锁
protected boolean tryAcquire(int arg) {
  throw new UnsupportedOperationException();
}

// acquire
public final void acquire int(arg) {
  if(!tryAcquire(arg) && acquireQueued(addWaiter( Node.EXCLUSIVE), arg)) selfInterrupt();
}
acquiredQueued
  • 保证head之后的一个节点在自旋CAS获取锁,其他线程都已经被挂起或者正在被挂起。
addWaiter

将当前线程封装成Node,加入到等待队列中,需要将Node插入到队尾,会通过CAS操作将当前节点置为尾结点;

  1. 如果失败,说明尾结点已经被其他线程修改了,调用enq方法,不断重试的将节点插入到队列中。
  2. 如果成功,当前节点成为尾结点,注意两个节点建立连接的时候,首先是后节点的pre指向前节点,当后节点成为尾结点后,前节点的next才会指向后节点。
private Node addWaiter(Node mode) {
  Node node = new Node(Thread.currentThread(), mode);
  // Try the fast path of enq; backup to full enq on failure
  Node pred = tail;
  if(pred != null) {
    node.prev = pred;
    if(compareAndSetTail pred, node)){
      pred.next = node;
      return node;
    }
  }
  enq(node);
  return node;
}
enq
  • 如果队列为初始化,那么就尝试初始化;
  • 如果尾插节点失败,那么就不断重试,直到插入成功为止。
private Node enq(final Node node) {
  for(;;) {
    Node t = tail;
    if(t == null) { // Must initialize
      if(compareAndSetHead(new Node()))
        tail = head;
    }else {
      node.prev = t;
      if(compareAndSetTail(t, node)) {
        t.next = node;
        return t;
      }
    }
  }
}


release

  • 通过tryRelease尝试释放锁,判断head节点的waitStatus,如果不为0,将其通过unparkSuccessor方法,通过CAS将其置为0,表示锁已释放;
  • 接下来就要唤醒等待队列里的其他节点,从后往前遍历搜索,找到除了head外最靠前且非CANCELED状态的Node,对其进行唤醒,尝试获取锁。
public final boolean release(int arg) {
  if(tryRelease(arg)) {
     Node h = head;
    if(h != null && h.waitStatus != 0)
      unparkSuccessor(h);
    return true;
  }
  return false;
}


\\ tryRelease
protected boolean tryRelease(int arg) {
    throw new UNsupportedOperationException();
    }

unparkSuccessor

为什么不直接从头开始搜索,而是从后往前搜索?

  1. 这里与addWaiter方法中,前后两个节点建立连接的顺序有关:
  2. 后节点的pre指向前节点
  3. 前节点的next才会指向后节点

这两步操作在多线程环境下并不是原子的,也就是说如果从前往后搜索,那么可能前节点next还未建立好,搜索会可能会中断。

private void unparkSuccessor(Node node) {
  /* 
  * If status is negative (i.e., possibly needing signal) try
  * to clear in anticipation of signalling. It is OK if this
  * fails or if status is changed by waiting thread.
  */
  int ws = node.waitStatus;
  if(ws < 0) {
    compareAndSetWaitStatus(node, ws, 0);
    
    /*
    * Thread to unpark is held in successor, which is normally
    * just the next node. But if cancelled or apparently null,
    * traverse backwards from tail to find the actual
    * non-cancelled successor.
    */
    Node s = node.next;
    if(s == null || s.waitStatus > 0) {
      s == null;
      for(Node t = tail; t != null && t != node; t = t.prev)
        if(t.waitStatus <= 0)
          s = t;
    }
    if(s != null)
      LockSupport.unpark(s.thread);


常用的辅助类

CountDownLatch

  • 相当于一个减法计数器,每执行一条线程执行计数减一,之后等待计数器归零才向下执行。也就是指定线程个数执行完毕再执行操作
  • countDownLatch.countDown(): 计数减一
  • countDownLatch.await(): 等待计数器归零才向下执行。

简单的测试代码:

CountDownLatch countDownLatch = new CountDownLatch(5);

for (int i = 0; i < 5; i++) {
  new Thread(() -> {
  System.out.println(Thread.currentThread().getName() + "-> GO out");
//  计数减一
  countDownLatch.countDown();
  },String.valueOf(i)).start();
}

//  等待计数器归零,之后再向下执行
countDownLatch.await();
System.out.println("关门");

实际的项目代码:

//  创建计数器
private CountDownLatch countDownLatch;

/***
* @author SHshuo
* 对外删除数据的接口
* @return
*/
public void deleteNode(List<SubordinateUser> subordinateUser, String enterpriseCode) throws InterruptedException {
//  创建计数器
CountDownLatch countDownLatch = new CountDownLatch(subordinateUser.size());
//  付给全局变量
  this.countDownLatch = countDownLatch;

if (subordinateUser.size() == 0) {
  throw new NullPointerException();
} else {
//  彻底摒弃stream开启线程的同步方法。
//  这里的多线程异步执行、parallelStream会根据核心线程自动启动多个线程实现异步调用
subordinateUser.parallelStream().forEach(p -> {
  deleteQuerySubordinateUser(p.getAccount(), enterpriseCode);
  });
}
//  等待所有线程执行完毕后返回,从而防止脏数据
countDownLatch.await();
}


 /***
* @author SHshuo
* 删除节点
* @return
*/
@Async
public void deleteQuerySubordinateUser(String account, String enterpriseCode) {
deleteLock.lock();
try {
System.out.println("正在执行" + Thread.currentThread().getName() + "线程");
             业务逻辑省略                 
} catch (Exception e) {
  e.printStackTrace();
} finally {
  deleteLock.unlock();
  countDownLatch.countDown();
  System.out.println("释放" + Thread.currentThread().getName() + "线程");
  }
}

CyclicBarrier

  • 相当于加法计数器,执行一条线程之后等待,一直累积到自定义的线程数量才会接着执行。也就是达到指定线程个数再执行操作
  • new CyclicBarrier(5,()-> System.out.println("关门"));累加一直到有5个线程,才会执行()-> System.out.println("关门")语句;
  • cyclicBarrier.await(): 等待

Semaphore

  • 信号量,相当于一个容器,用来限制线程的数量。同一时间只能有指定数量个得到线程。常用在限流、共享资源互斥使用的场景中
  • semaphore.acquire():获得资源,可能会造成阻塞。
  • semaphore.release():释放资源。
  • Semaphore这个概念很像同步队列的意思