Java锁(五)CyclicBarrier分析

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。

1、CyclicBarrier 使用实例

public class CyclicBarrierTest {
    static CyclicBarrier c = new CyclicBarrier(2);
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    c.await();
                } catch (Exception e) {
                }
                System.out.println(1);
            }
        }).start();
        try {
            c.await();
        } catch (Exception e) {
        }
        System.out.println(2);
    }
}

输出 1,2 或者 2,1
如果把 new CyclicBarrier(2)修改成 new CyclicBarrier(3)则主线程和子线程会永远等待,因为没有第三个线程执行 await 方法,即没有第三个线程到达屏障,所以之前到达屏障的两个线程都不会继续执行。
CyclicBarrier 还提供一个更高级的构造函数 CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景。代码如下:

public class CyclicBarrierTest2 {
    static CyclicBarrier c = new CyclicBarrier(2, new A());
    public static void main(String[] args) {
        new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    c.await();
                } catch (Exception e) {

                }
                System.out.println(1);
            }
        }).start();

        try {
            c.await();
        } catch (Exception e) {

        }
        System.out.println(2);
    }

    static class A implements Runnable {

        @Override
        public void run() {
            System.out.println(3);
        }

    }
}

输出 1、3、2

2、CyclicBarrier 源码分析

CyclicBarrier 底层是基于 ReentrantLock、AbstractQueuedSynchronizer,ConditionObject 来实现的,实现相对比较简单。了解前面的 ReentrantLock,对 AQS 的分析中已经指出了其数据结构,在这里不再累赘。
CyclicBarrier 的几个标志性的成员变量

/**
 * 循环栅栏的当前代
 */
private static class Generation {
    boolean broken = false;
}

/** 屏障的重入锁 */
private final ReentrantLock lock = new ReentrantLock();
/** 等待状态直到触发*/
private final Condition trip = lock.newCondition();
/**  parties 数量 */
private final int parties;
/** 到达屏障时先触发的操作 */
private final Runnable barrierCommand;

/** 一个generation对象代表一代的屏障,
 * 就是说,如果generation对象不同,就代表进入了下一次的屏障,
 * 所以说,这个线程屏障是可循环的(Cyclic)
 */
private Generation generation = new Generation();

/**
 * count是计数器,如果有线程到达了屏障点,count就减1;
 * 直到count=0时,其它线程才可以向下执行
 */
private int count;

线程等待所有线程到达,触发栅栏

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

/**
 * 主要屏障代码,负责各种策略
 */
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 获取屏障的当前代信息
        final Generation g = generation;

        if (g.broken)
            throw new BrokenBarrierException();

        // 线程中断,中断屏障
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        // count-1,到达0的时候,所有的线程向下执行
        int index = --count;
        if (index == 0) {  // 触发屏障的栅栏
            boolean ranAction = false;
            try {
                // 如果设置了barrierCommand,优先执行
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                // 所有线程都到的屏障点
                // 更新屏障状态,唤醒其他线程,生成下一代屏障
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 循环直到触发屏障栅栏,或者中断,超时
        for (;;) {
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 被中断,设置栅栏中断标志
                if (g == generation && ! g.broken) {
                    // 设置broken中断标示
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            // 屏障被中断,抛出异常
            if (g.broken)
                throw new BrokenBarrierException();

            // 不是栅栏的当前代(所有线程都到达,已经生成下一代的generation)
            if (g != generation)
                return index;

            // 超时后,中断屏障
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

/**
 * 中断屏障,唤醒其他线程
 */
private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

/**
 * 更新屏障状态,唤醒其他线程,生成下一代屏障
 */
private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();
    // set up next generation
    count = parties;
    generation = new Generation();
}

3、CyclicBarrier 和 CountDownLatch 的区别

CountDownLatch 的计数器只能使用一次。而 CyclicBarrier 的计数器可以使用 reset() 方法重置。所以 CyclicBarrier 能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
CyclicBarrier 还提供其他有用的方法,比如 getNumberWaiting 方法可以获得 CyclicBarrier 阻塞的线程数量。isBroken 方法用来知道阻塞的线程是否被中断。

4、CyclicBarrier 的应用场景

CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个 Excel 保存了用户所有银行流水,每个 Sheet 保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个 sheet 里的银行流水,都执行完之后,得到每个 sheet 的日均银行流水,最后,再用 barrierAction 用这些线程的计算结果,计算出整个 Excel 的日均银行流水。

https://alicharles.oss-cn-hangzhou.aliyuncs.com/static/images/mp_qrcode.jpg
文章目录
  1. 1、CyclicBarrier 使用实例
  2. 2、CyclicBarrier 源码分析
  3. 3、CyclicBarrier 和 CountDownLatch 的区别
  4. 4、CyclicBarrier 的应用场景