Semaphore可以用来控制访问临界资源的线程数,也是基于AQS框架的。
CLH首个线程结点唤醒后续线程节点
int r = tryAcquireShared(arg);
// 可用“权限”数(state)判断
if (r >= 0) {
// 向后广播,唤醒后继节点
// java.util.concurrent.locks.AbstractQueuedSynchronizer#setHeadAndPropagate
setHeadAndPropagate(node, r);
// 省略不重要代码...
}
// 释放共享锁方法
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 可唤醒
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
// head状态为0表示刚唤醒的线程获取锁失败且重新阻塞设置为SIGNAL状态之间
// 或者获取锁成功且head是队尾,此时设置为PROPAGATE为了被其他线程检测到。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// head发生变化表示头结点被唤醒且头指针发生移动,所以没有变化就表示没有唤醒成功,再次
// 循环尝试唤醒或者队列没有可唤醒节点时退出。
if (h == head)
break;
}
}
示例代码
/**
* @author heoller
* @date 2020/8/19 20:30
*/
public class SemaphoreCode {
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(10);
// AQS初始的state值设置为1,表示只能有一个线程可以访问临界资源
Semaphore semaphore = new Semaphore(1);
// 创建一个线程池,大小为10
ExecutorService executorService = Executors.newFixedThreadPool(10);
// 提交10个线程
for (int i = 0; i < 10; i++) {
executorService.submit(() -> {
try {
// 获取线程执行“权限”,其中一个线程获取成功时,state值为0,其余线程获
// 取“权限”失败,会进入CLH阻塞队列并阻塞。
semaphore.acquire();
// 也可以设置最长等待时间
// semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS);
for (int j = 0; j < 10000; j++) {
count++;
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 线程释放“权限”
semaphore.release();
countDownLatch.countDown();
}
});
}
countDownLatch.await();
executorService.shutdown();
System.out.println(count);
}
} 
京公网安备 11010502036488号