Semaphore
可以用来控制访问临界资源的线程数,也是基于AQS框架的。
CLH首个线程结点唤醒后续线程节点
int r = tryAcquireShared(arg); // 可用“权限”数(state)判断 if (r >= 0) { // 向后广播,唤醒后继节点 // java.util.concurrent.locks.AbstractQueuedSynchronizer#setHeadAndPropagate setHeadAndPropagate(node, r); // 省略不重要代码... } // 释放共享锁方法 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // 可唤醒 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } // head状态为0表示刚唤醒的线程获取锁失败且重新阻塞设置为SIGNAL状态之间 // 或者获取锁成功且head是队尾,此时设置为PROPAGATE为了被其他线程检测到。 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } // head发生变化表示头结点被唤醒且头指针发生移动,所以没有变化就表示没有唤醒成功,再次 // 循环尝试唤醒或者队列没有可唤醒节点时退出。 if (h == head) break; } }
示例代码
/** * @author heoller * @date 2020/8/19 20:30 */ public class SemaphoreCode { private static int count = 0; public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(10); // AQS初始的state值设置为1,表示只能有一个线程可以访问临界资源 Semaphore semaphore = new Semaphore(1); // 创建一个线程池,大小为10 ExecutorService executorService = Executors.newFixedThreadPool(10); // 提交10个线程 for (int i = 0; i < 10; i++) { executorService.submit(() -> { try { // 获取线程执行“权限”,其中一个线程获取成功时,state值为0,其余线程获 // 取“权限”失败,会进入CLH阻塞队列并阻塞。 semaphore.acquire(); // 也可以设置最长等待时间 // semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS); for (int j = 0; j < 10000; j++) { count++; } } catch (InterruptedException e) { e.printStackTrace(); } finally { // 线程释放“权限” semaphore.release(); countDownLatch.countDown(); } }); } countDownLatch.await(); executorService.shutdown(); System.out.println(count); } }