一、引入
- countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。
- 是通过一个
计数器
来实现的,计数器的初始值是线程的数量或者任务的数量。 - 每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。
- CountDownLatch的方便之处在于,你可以在一个线程中使用,也可以在多个线程上使用,一切只依据状态值,这样便不会受限于任何的场景。
二、分析
在java5提供的并发包下,有一个AbstractQueuedSynchronizer抽象类,也叫AQS,此类根据大部分并发共性作了一些抽象,便于开发者实现如排他锁,共享锁,条件等待等更高级的业务功能。
它通过使用CAS和队列模型,出色的完成了抽象任务
- 一开始,我们创建了一个CountDownLatch实例
- 此时,AQS中,状态值state=2,对于 CountDownLatch 来说,state=2表示所有调用await方法的线程都应该阻塞,等到同一个latch被调用两次countDown后才能唤醒沉睡的线程。接着线程3和线程4执行了 await方法,这会的状态图如下:
- 上面的通知状态是节点的属性,表示该节点出队后,必须唤醒其后续的节点线程。
- 当线程1和线程2分别执行完latch.countDown方法后,会把state值置为0,
- 此时,
通过CAS成功置为0的那个线程将会同时承担起唤醒队列中第一个节点线程的任务
,从上图可以看出,第一个节点即为线程3,当线程3恢复执行之后,其发现状态值为通知状态,所以会唤醒后续节点,即线程4节点,然后线程3继续做自己的事情,到这里,线程3和线程4都已经被唤醒,CountDownLatch功成身退。
三、使用场景一
需求
- 可能刚从数据库读取了一批数据
- 利用并发处理这批数据
- 当所有的数据处理完成后,再去执行后面的操作
解决方案
- 第一种:可以利用 join 的方法,但是在线程池中,比较麻烦
- 第二种:利用线程池的awaitTermination,阻塞一段时间
- 第三种:利用CountDownLatch,每当任务完成一个,就计数器减一
public class CountDownLatchExample {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static ExecutorService executor = Executors.newFixedThreadPool(2);//线程池
private static CountDownLatch latch ;
public static void main(String[] args) throws InterruptedException {
int[] data = query();//模拟从数据库查询的一批数据
latch = new CountDownLatch(data.length);
//让线程并发的处理数据
for (int i = 0; i < data.length; i++) {
executor.execute(new SimpleRunnable(data,i, latch));
}
latch.await();
executor.shutdown();
// executor.awaitTermination(1, TimeUnit.HOURS);//利用线程池的等待机制,会阻塞住
System.out.println("all of finish done!!");
//等待全部线程处理完
}
static class SimpleRunnable implements Runnable{
private final int [] data;
private final int index;
private final CountDownLatch latch;
SimpleRunnable(int[] data, int index, CountDownLatch latch) {
this.data = data;
this.index = index;
this.latch = latch;
}
@Override
public void run() {
try {
Thread.sleep(RANDOM.nextInt(2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
int value = data[index];
//数据处理逻辑
if (value%2==0){
data[index] = value*2;
}else {
data[index] = value*10;
}
latch.countDown();
System.out.println(Thread.currentThread().getName() + " is finished.");
}
}
private static int[] query(){
return new int[]{1,2,3,4,5};
}
}
结果
pool-1-thread-2 is finished.
pool-1-thread-1 is finished.
pool-1-thread-2 is finished.
pool-1-thread-1 is finished.
pool-1-thread-2 is finished.
all of finish done!!
四、使用场景二
需求
- 多个线程协同工作
- 尝试多个线程需要等待其他线程的工作
- 被唤醒后继续执行其他操作
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " Do some initial working.");
try {
Thread.sleep(1000);
latch.await();
System.out.println(Thread.currentThread().getName() + " Do other working.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " Do some initial working.");
try {
Thread.sleep(1000);
latch.await();
System.out.println(Thread.currentThread().getName() + " Do other working.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
System.out.println("asyn prepare for some data.");
try {
Thread.sleep(2000);
System.out.println("Data prepare for done.");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
latch.countDown();
}
}).start();
}
}
五、API使用
构造方法只有一个
CountDownLatch(int count)
:构造一个以给定计数
实例方法
-
public void await()
- 当前线程等到锁存器计数到零
- 可以被打断
-
public boolean await(long timeout,TimeUnit unit)
- 等待一段时间
- timeout - 等待的最长时间 ,unit - timeout参数的时间单位
- 如果指定的等待时间过去,则返回值false
- 如果计数达到零,则方法返回值为true
-
public void countDown()
- 减少锁存器的计数,如果计数达到零,释放所有等待的线程。
-
public long getCount()
- 返回当前计数
六、给离散的平行任务增加逻辑层次关系
需求
- 并发的从很多的数据库读取大量数据
- 在读取数据的过程中,某个表可能会出现:
数据丢失、数据精度丢失、数据大小不匹配
- 需要进行对数据的各个情况进行检测,这个检测是并发的完成的
- 所以需要控制如果一个表所有的情况检测完成,再进行后续的操作
解决
- 利用
CountDownLatch
的计数器 - 每当一个检测完成,
计数器减一
- 如果计数为0,执行后面操作
public class CountDownLatchExample {
private static final Random RANDOM= new Random();
public static void main(String[] args) throws Exception {
Event [] events = {new Event(1),new Event(2)};
ExecutorService service = Executors.newFixedThreadPool(5);
for (Event event : events) {
List<Table> tables = capture(event);
for (Table table : tables) {
TaskBatch taskBatch = new TaskBatch(2);
TrustSourceColumns sourceColumns = new TrustSourceColumns(table, taskBatch);
TrustSourceRecordCount recordCount = new TrustSourceRecordCount(table, taskBatch);
service.submit(sourceColumns);
service.submit(recordCount);
}
}
}
static class Event{
private int id;
Event(int id) {
this.id = id;
}
}
interface Watcher{
void done(Table table);
}
static class TaskBatch implements Watcher{
private final CountDownLatch latch;
TaskBatch(int size) {
this.latch = new CountDownLatch(size);
}
@Override
public void done(Table table) {
latch.countDown();
if (latch.getCount() == 0){
System.out.println("The table " + table.tableName + " finished work , " + table.toString());
}
}
}
static class Table{
String tableName;
long sourceRecordCount;
long targetCount;
String columnSchema = " tableName = a | column1Type = varchar";
String targetColumnSchema = "";
public Table(String tableName,long sourceRecordCount) {
this.tableName = tableName;
this.sourceRecordCount = sourceRecordCount;
}
@Override
public String toString() {
return "Table{" +
"tableName='" + tableName + '\'' +
", sourceRecordCount=" + sourceRecordCount +
", targetCount=" + targetCount +
", columnSchema='" + columnSchema + '\'' +
", targetColumnSchema='" + targetColumnSchema + '\'' +
'}';
}
}
private static List<Table> capture(Event event){
List<Table> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
list.add(new Table("table-"+event.id + "-" +i,i*1000));
}
return list;
}
static class TrustSourceRecordCount implements Runnable{
private final Table table;
private final TaskBatch taskBatch;
TrustSourceRecordCount(Table table, TaskBatch taskBatch) {
this.table = table;
this.taskBatch = taskBatch;
}
@Override
public void run() {
try {
Thread.sleep(RANDOM.nextInt(10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
table.targetCount = table.sourceRecordCount;
// System.out.println("The table : " + table.tableName + " record count capture done and update.");
taskBatch.done(table);
}
}
static class TrustSourceColumns implements Runnable{
private final Table table;
private final TaskBatch taskBatch;
TrustSourceColumns(Table table, TaskBatch taskBatch) {
this.table = table;
this.taskBatch = taskBatch;
}
@Override
public void run() {
try {
Thread.sleep(RANDOM.nextInt(10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
table.targetColumnSchema = table.columnSchema;
// System.out.println("The table : " + table.tableName + " target columns capture done and update.");
taskBatch.done(table);
}
}
}