引出

栅栏类似于闭锁,它能阻塞一组线程直到某个事件的发生。栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程

CyclicBarrier可以使一定数量的线程反复地在栅栏位置处汇集。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。 如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。

举个例子,就像生活中我们会约朋友们到某个餐厅一起吃饭,有些朋友可能会早到,有些朋友可能会晚到,但是这个餐厅规定必须等到所有人到齐之后才会让我们进去。这里的朋友们就是各个线程,餐厅就是 CyclicBarrier。

API使用

构造方法

public CyclicBarrier(int parties)
public CyclicBarrier(int parties, Runnable barrierAction)
  • parties 是参与线程的个数
  • 第二个构造方法有一个 Runnable 参数,这个参数的意思是最后一个到达线程要做的任务

重要方法

public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
  • 线程调用 await() 表示自己已经到达栅栏
  • BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时

其他方法

public void reset()
  • 将屏障重置为初始状态。 如果任何一方正在等待屏障,他们将返回BrokenBarrierException
  • 这样就可以重复利用这个屏障

CyclicBarrier 与 CountDownLatch 区别

  • CountDownLatch 是一次性的。CyclicBarrier 是可循环利用的
  • CountDownLatch 参与的线程的职责是不一样的,有的在倒计时,有的在等待倒计时结束。CyclicBarrier 参与的线程职责是一样的

利用CountDownLatch实现回调函数

实现:

  • 在每个线程使计数器减一的时候,利用getCount判断,当前是否所有线程任务执行完成
public class CyclicBarrierTest {
    static class MyCountDownLatch extends CountDownLatch{
        private final Runnable runnable;

        public MyCountDownLatch(int count,Runnable runnable) {
            super(count);
            this.runnable = runnable;
        }

        @Override
        public void countDown() {
            super.countDown();
            if (getCount()==0){
                this.runnable.run();
            }
        }

    }

    public static void main(String[] args) {

        final MyCountDownLatch latch = new MyCountDownLatch(2, ()->{
            System.out.println("All of work finish done. This is call back.");
        });


        new Thread(){
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    latch.countDown();
                    System.out.println(getName() +  " finished.");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }.start();

        new Thread(){
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(getName() +  " finished.");
            }
        }.start();

    }
}