一、引入

  • countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。
  • 是通过一个计数器来实现的,计数器的初始值是线程的数量或者任务的数量
  • 每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。
  • CountDownLatch的方便之处在于,你可以在一个线程中使用,也可以在多个线程上使用,一切只依据状态值,这样便不会受限于任何的场景。

二、分析

在java5提供的并发包下,有一个AbstractQueuedSynchronizer抽象类,也叫AQS,此类根据大部分并发共性作了一些抽象,便于开发者实现如排他锁,共享锁,条件等待等更高级的业务功能。

它通过使用CAS和队列模型,出色的完成了抽象任务

  • 一开始,我们创建了一个CountDownLatch实例

  • 此时,AQS中,状态值state=2,对于 CountDownLatch 来说,state=2表示所有调用await方法的线程都应该阻塞,等到同一个latch被调用两次countDown后才能唤醒沉睡的线程。接着线程3和线程4执行了 await方法,这会的状态图如下:

  • 上面的通知状态是节点的属性,表示该节点出队后,必须唤醒其后续的节点线程。
  • 当线程1和线程2分别执行完latch.countDown方法后,会把state值置为0,
  • 此时,通过CAS成功置为0的那个线程将会同时承担起唤醒队列中第一个节点线程的任务,从上图可以看出,第一个节点即为线程3,当线程3恢复执行之后,其发现状态值为通知状态,所以会唤醒后续节点,即线程4节点,然后线程3继续做自己的事情,到这里,线程3和线程4都已经被唤醒,CountDownLatch功成身退。

三、使用场景一

需求

  • 可能刚从数据库读取了一批数据
  • 利用并发处理这批数据
  • 当所有的数据处理完成后,再去执行后面的操作

解决方案

  • 第一种:可以利用 join 的方法,但是在线程池中,比较麻烦
  • 第二种:利用线程池的awaitTermination,阻塞一段时间
  • 第三种利用CountDownLatch,每当任务完成一个,就计数器减一
public class CountDownLatchExample {

    private static final Random RANDOM = new Random(System.currentTimeMillis());


    private static ExecutorService executor = Executors.newFixedThreadPool(2);//线程池

    private static CountDownLatch latch ;
    public static void main(String[] args) throws InterruptedException {

        int[] data = query();//模拟从数据库查询的一批数据

        latch = new CountDownLatch(data.length);

        //让线程并发的处理数据
        for (int i = 0; i < data.length; i++) {
            executor.execute(new SimpleRunnable(data,i, latch));
        }
        latch.await();
        executor.shutdown();
// executor.awaitTermination(1, TimeUnit.HOURS);//利用线程池的等待机制,会阻塞住

        System.out.println("all of finish done!!");

        //等待全部线程处理完

    }

    static class SimpleRunnable implements Runnable{
        private final int [] data;

        private final int index;

        private final CountDownLatch latch;

        SimpleRunnable(int[] data, int index, CountDownLatch latch) {
            this.data = data;
            this.index = index;
            this.latch = latch;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(RANDOM.nextInt(2000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            int value = data[index];
            //数据处理逻辑
            if (value%2==0){
                data[index] = value*2;
            }else {
                data[index] = value*10;
            }
            latch.countDown();
            System.out.println(Thread.currentThread().getName() + " is finished.");
        }
    }


    private static int[] query(){
        return new int[]{1,2,3,4,5};
    }
}

结果

pool-1-thread-2 is finished.
pool-1-thread-1 is finished.
pool-1-thread-2 is finished.
pool-1-thread-1 is finished.
pool-1-thread-2 is finished.
all of finish done!!

四、使用场景二

需求

  • 多个线程协同工作
  • 尝试多个线程需要等待其他线程的工作
  • 被唤醒后继续执行其他操作
public class CountDownLatchExample {

    public static void main(String[] args) throws InterruptedException {
        final CountDownLatch latch = new CountDownLatch(1);
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + " Do some initial working.");
            try {
                Thread.sleep(1000);
                latch.await();
                System.out.println(Thread.currentThread().getName() + " Do other working.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + " Do some initial working.");
            try {
                Thread.sleep(1000);
                latch.await();
                System.out.println(Thread.currentThread().getName() + " Do other working.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();


        new Thread(() -> {
            System.out.println("asyn prepare for some data.");
            try {
                Thread.sleep(2000);
                System.out.println("Data prepare for done.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                latch.countDown();
            }
        }).start();


    }

}

五、API使用

构造方法只有一个

  • CountDownLatch(int count) :构造一个以给定计数

实例方法

  • public void await()

    • 当前线程等到锁存器计数到零
    • 可以被打断
  • public boolean await(long timeout,TimeUnit unit)

    • 等待一段时间
    • timeout - 等待的最长时间 ,unit - timeout参数的时间单位
    • 如果指定的等待时间过去,则返回值false
    • 如果计数达到零,则方法返回值为true
  • public void countDown()

    • 减少锁存器的计数,如果计数达到零,释放所有等待的线程
  • public long getCount()

    • 返回当前计数

六、给离散的平行任务增加逻辑层次关系

需求

  • 并发的从很多的数据库读取大量数据
  • 在读取数据的过程中,某个表可能会出现:数据丢失、数据精度丢失、数据大小不匹配
  • 需要进行对数据的各个情况进行检测,这个检测是并发的完成的
  • 所以需要控制如果一个表所有的情况检测完成,再进行后续的操作

解决

  • 利用CountDownLatch的计数器
  • 每当一个检测完成,计数器减一
  • 如果计数为0,执行后面操作
public class CountDownLatchExample {

    private static final Random RANDOM= new Random();

    public static void main(String[] args) throws Exception {
        Event [] events = {new Event(1),new Event(2)};

        ExecutorService service = Executors.newFixedThreadPool(5);

        for (Event event : events) {
            List<Table> tables = capture(event);
            for (Table table : tables) {
                TaskBatch taskBatch = new TaskBatch(2);
                TrustSourceColumns sourceColumns = new TrustSourceColumns(table, taskBatch);
                TrustSourceRecordCount recordCount = new TrustSourceRecordCount(table, taskBatch);
                service.submit(sourceColumns);
                service.submit(recordCount);
            }
        }
    }


    static class Event{
        private int id;

        Event(int id) {
            this.id = id;
        }
    }

    interface Watcher{
        void done(Table table);
    }

    static class TaskBatch implements Watcher{

        private final CountDownLatch latch;

        TaskBatch(int size) {
            this.latch = new CountDownLatch(size);
        }



        @Override
        public void done(Table table) {
            latch.countDown();
            if (latch.getCount() == 0){
                System.out.println("The table " + table.tableName + " finished work , " + table.toString());
            }
        }
    }

    static class Table{
        String tableName;
        long sourceRecordCount;
        long targetCount;
        String columnSchema = " tableName = a | column1Type = varchar";

        String targetColumnSchema  = "";

        public Table(String tableName,long sourceRecordCount) {
            this.tableName = tableName;
            this.sourceRecordCount = sourceRecordCount;

        }

        @Override
        public String toString() {
            return "Table{" +
                    "tableName='" + tableName + '\'' +
                    ", sourceRecordCount=" + sourceRecordCount +
                    ", targetCount=" + targetCount +
                    ", columnSchema='" + columnSchema + '\'' +
                    ", targetColumnSchema='" + targetColumnSchema + '\'' +
                    '}';
        }
    }

    private static List<Table> capture(Event event){
        List<Table> list = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            list.add(new Table("table-"+event.id + "-" +i,i*1000));
        }
        return list;
    }

    static class TrustSourceRecordCount implements Runnable{

        private final Table table;

        private final TaskBatch taskBatch;

        TrustSourceRecordCount(Table table, TaskBatch taskBatch) {
            this.table = table;
            this.taskBatch = taskBatch;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(RANDOM.nextInt(10000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            table.targetCount = table.sourceRecordCount;

// System.out.println("The table : " + table.tableName + " record count capture done and update.");
            taskBatch.done(table);

        }

    }


    static class TrustSourceColumns implements Runnable{

        private final Table table;

        private final TaskBatch taskBatch;

        TrustSourceColumns(Table table, TaskBatch taskBatch) {
            this.table = table;
            this.taskBatch = taskBatch;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(RANDOM.nextInt(10000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            table.targetColumnSchema = table.columnSchema;
// System.out.println("The table : " + table.tableName + " target columns capture done and update.");
            taskBatch.done(table);
        }

    }
}