CyclicBarrier源码分析
CountDownLatch源码分析这篇文章分析了基于AQS共享锁实现的CountDownLatch
,下面分析一下与CountDownLatch
类似的同步工具CyclicBarrier
。
CyclicBarrier
的字面意思是可循环使用(Cyclic
)的屏障(Barrier
)。它使得一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会打开,所有被屏障拦截的线程才会继续运行。
一、与CountDownLatch的区别
1.将count值递减的线程
在CountDownLatch
中,执行countDown
方法的线程和执行await
方法的线程不是一类线程。例如,线程M,N需要等待线程A。B,C,D,E执行完成后才能继续往下执行,则线程A,B,C,D,E执行完成后都将调用countDown
方法,使得最后count
变为0,最后一个将count
值减为0的线程调用的tryReleaseShared
方法会成功返回true,从而调用doReleaseShared()
唤醒所有在sync queue
中等待共享锁的线程,这里对应的就是M,N。所以,在CountDownLatch
中,执行countDown
的线程不会被挂起,调用await
方法的线程会阻塞等待共享锁。
而在CyclicBarrier
中,将count
值递减的线程和执行await
方法的线程是一类线程,它们在执行完递减count
的操作后,如果count
值不为0,则可能同时被挂起。例如,线程A,B,C,D,E需要互相等待,保证所有线程都执行完了之后才能一起通过。
这就好像同一个班级出去春游,到一个景区后先自由活动,一段时间后在指定的地点集合,然后去下一个景点。这里这个指定集合的地点就是CyclicBarrier
中的barrier
,每一个人到达后都会执行await
方法先将需要继续等待的人数(count
)减1,然后(在条件队列上)挂起等待,当最后一个人到了之后,发现人已经到齐了,则他负责执行barrierCommand
(例如向班主任汇报人已经到齐),接着就唤醒所有还在等待中的线程,开启新一代。
2.是否能重复使用
CountDownLatch
是一次性的,当count
值被减为0后,不会被重置;
而CyclicBarrier
在线程通过屏障后,会开启新的一代,count
值会被重置。
3.锁的类别与所使用到的队列
CountDownLatch
使用的是共享锁,count
值不为0时,线程在sync queue
中等待,自始至终只牵涉到sync queue
,由于使用共享锁,唤醒操作不必等待锁释放后再进行,唤醒操作很迅速。
CyclicBarrier
使用的是独占锁,count
值不为0时,线程进入condition queue
中等待,当count
值降为0后,将被signalAll()
方法唤醒到sync queue
中去,然后依次去争锁(因为是独占锁),在前驱节点释放锁以后,才能继续唤醒后继节点。
二、核心属性
// 由于CyclicBarrier是可重复使用的,因此把每一个新的barrier称为一“代”。
// 一组相互等待的线程属于一代
private static class Generation {
boolean broken = false;
}
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock(); // 重入的独占锁
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition(); // 条件队列
/** The number of parties */
private final int parties; // 参与的总线程数
/* The command to run when tripped */
private final Runnable barrierCommand; // 屏障打开时执行的动作
/** The current generation */
private Generation generation = new Generation(); // 代表当前代
/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
*/
private int count; // 目前还需要等待的线程数
CyclicBarrier
的核心属性共有6个,将它分为三组。
第一组:
private final int parties; // 参与的总线程数
private int count; // 目前还需要等待的线程数
这两个属性都是用来表征线程的数量,parties
代表了参与线程的总数,即需要一同通过barrier
的线程数,它是final类型的,由构造函数初始化,在类被创建后就一直不变了;count
属性和CountDownLatch
中的count一样,代表还需要等待的线程数,初始值为parties
,每当一个线程到来就减一,如果该值为0,则说明所有的线程都到齐了,所有线程可以一起通过barrier
了。
第二组:
private final ReentrantLock lock = new ReentrantLock(); // 重入的独占锁
private final Condition trip = lock.newCondition(); // 条件队列
private Generation generation = new Generation(); // 代表当前代
这一组代表CyclicBarrier
的基础实现,即CyclicBarrier
是基于独占锁ReentrantLock
和条件队列Condition
实现的,而不是共享锁,所有相互等待的线程都会在同样的条件队列trip
上挂起,被唤醒后将会被添加到sync queue
中去争取独占锁lock,获得锁的线程将继续往下执行。
这里还有一个Generation
对象,从定义上可以看出,它只有一个boolean
类型的broken
属性,关于这个Generation
,下面分析源码的时候再详细讲。
第三组:
private final Runnable barrierCommand; // 屏障打开时执行的动作
这是一个Runnable
对象,代表了一个任务。当所有线程都到齐后,在它们一同通过barrier
之前,就会执行这个对象的run
方法,因此,它有点类似于一个钩子方法。当然这个参数不是必须的,如果线程在通过barrier
之前没有什么特别需要处理的事情,该值可以为null。
三、构造函数
CyclicBarrier
有两个构造函数:
// parties --- 参与的线程数
public CyclicBarrier(int parties) {
this(parties, null);
}
// parties --- 参与的线程数,barrierAction --- 所有线程到达后先执行的动作
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
其中,第一个构造函数本质上也是调用了第二个,即如果不传入Runnable
对象,则barrierCommand
的值默认为null。
可以看出,构造函数就是初始化了parties
,count
,barrierCommand
三个变量。
四、辅助方法
要理解CyclicBarrier
,首先需要弄明白它的几个辅助方法。
首先需要理解的是“代”(Generation)的概念,由于CyclicBarrier
是可重复使用的,因此把每一个新的barrier
称为一“代”。这个怎么理解呢,打个比方:一个过山车有10个座位,景区常常需要等够10个人了,才会去开动过山车。于是游客常常在栏杆(barrier)外面等,等凑够了10个人,工作人员就把栏杆打开,让10个人通过;然后再将栏杆归位,后面新来的人还是要在栏杆外等待。这里,前面已经通过的人就是一“代”,后面再继续等待的一波人就是另外一“代”,栏杆每打开关闭一次,就产生新一的“代”。
在CyclicBarrier
,开启新的一代使用的是nextGeneration
方法:
1.nextGeneration
// 开启新的一代
private void nextGeneration() {
// 唤醒当前这一代中所有等待在条件队列里的线程
trip.signalAll();
// 恢复count值,开启新的一代
count = parties;
// 创建新Generation实例
generation = new Generation();
}
该方法用于开启新的“一代”,通常是被最后一个调用await
方法的线程调用。在该方法中,主要工作就是唤醒当前这一代中所有等待在条件队列里的线程,将count
的值恢复为parties
,以及开启新的一代。
2.breakBarrier
breakBarrier
即打破现有的屏障,让所有线程通过。
// 打破屏障
private void breakBarrier() {
// 标记当前代的broken状态
generation.broken = true;
// 恢复count值
count = parties;
// 唤醒当前这一代中所有等待在条件队列里的线程(因为屏障已经打破了)
trip.signalAll();
}
这个breakBarrier
方法的理解:继续拿上面过上车的例子打比方,有时候某个时间段,景区的人比较少,等待过山车的人数凑不够10个人,眼看后面迟迟没有人再来,这个时候有的工作人员也会打开栅栏,让正在等待的人进来坐过山车。这里工作人员的行为就是breakBarrier
,由于并不是在凑够10个人的情况下就开启了栅栏,就把这一代的broken
状态标记为true
。
3.reset
reset
方法用于将barrier
恢复成初始的状态,它的内部就是简单地调用了breakBarrier
方法和nextGeneration
方法。
// 将CyclicBarrier恢复到初始状态
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation 打破当前的这一代
nextGeneration(); // start a new generation 开启新的一代
} finally {
lock.unlock();
}
}
这里要注意的是,如果在执行该方法时有线程正等待在barrier
上,则该线程将立即返回并抛出BrokenBarrierException
异常。另外,该方法执行前需要先获得锁。
五、await方法
分析完前面的辅助方法之后,接下来来看CyclicBarrier
最核心的await
方法,可以说整个CyclicBarrier
最关键的只有它了。它也是一个集“countDown”和“阻塞等待”于一体的方法。
await
方法有两种版本,一种带超时机制,一种不带,然而从源码上看,它们最终调用的都是带超时机制的dowait
方法:
// 非限时阻塞等待
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L); // 非限时等待
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
// 限时阻塞等待
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
return dowait(true, unit.toNanos(timeout)); // 限时阻塞等待
}
dowait
方法定义如下,它是整个CyclicBarrier
的核心,下面直接在代码中以注释的形式分析:
// 阻塞等待,返回该线程是第几个到达屏障的索引(可设置等待时间)
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
// 所有执行await方法的线程必须是已经持有了锁,所以这里必须先获取锁
lock.lock();
try {
final Generation g = generation; // 当前代
// 前面说过,调用breakBarrier方法会将当前“代”的broken属性设为true
// 如果一个正在await的线程发现barrier已经被break了,则将直接抛出BrokenBarrierException异常
if (g.broken)
throw new BrokenBarrierException();
// 如果当前线程在进入dowait方法时已经被中断,则先将屏障打破,再抛出InterruptedException
// 这么做的原因是,所有等待在barrier的线程都是相互等待的,如果其中一个被中断了,那其他的就
// 不用等了。
if (Thread.interrupted()) {
breakBarrier(); // 打破屏障
throw new InterruptedException();
}
// 当前线程已经来到了屏障前,先将等待的线程数减一
int index = --count;
// 如果等待的线程数为0了,则说明所有的parties都到齐了
// 则可以唤醒所有等待的线程,让大家一起通过屏障,并重置屏障
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
// 如果创建CyclicBarrier时传入了barrierCommand
// 说明通过屏障前有一些额外的工作要做
command.run();
ranAction = true;
// 唤醒当前代中的所有线程,开启新一代
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier(); // 执行动作失败,打破屏障
}
}
// 如果count数不为0,就将当前线程挂起,直到所有的线程到齐,或者超时,或者中断发生
for (;;) {
try {
// 如果没有设定超时机制,则直接调用Condition的await方法
if (!timed)
trip.await(); // 当前线程在这里被挂起
else if (nanos > 0L)
// 如果设了超时,则等待指定的时间
nanos = trip.awaitNanos(nanos); // 当前线程在这里被挂起,超时时间到了就会自动唤醒
} catch (InterruptedException ie) {
// 执行到这里说明线程被中断了
// 如果线程被中断时还处于当前这一“代”,并且当前这一代还没有被broken,则先打破屏障
if (g == generation && ! g.broken) {
breakBarrier(); // 打破屏障
throw ie; // 抛出中断异常
} else {
// 注意来到这里有两种情况
// 一种是g!=generation,说明新的一代已经产生了,所以这里没有必要处理这个中断,只要再自我中断一下就好,交给后续的逻辑处理
// 一种是g.broken = true, 说明中断前栅栏已经被打破了,既然中断发生时栅栏已经被打破了,也没有必要再处理这个中断了
Thread.currentThread().interrupt();
}
}
// 注意,执行到这里是对应于线程从await状态被唤醒了
// 这里先检测broken状态,能使broken状态变为true的,只有breakBarrier()方法,到这里对应的场景是
// 1. 其他执行await方法的线程在挂起前就被中断了
// 2. 其他执行await方法的线程在还处于等待中时被中断了
// 2. 最后一个到达的线程在执行barrierCommand的时候发生了错误
// 4. reset()方法被调用
if (g.broken)
throw new BrokenBarrierException();
// 如果线程被唤醒时,新一代已经被开启了,说明一切正常,直接返回
if (g != generation)
return index;
// 如果是因为超时时间到了被唤醒,则打破栅栏,返回TimeoutException
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
这个await
方法虽然包揽了countDown
、阻塞线程、唤醒线程、执行barrierCommand
任务、开启新一代,处理中断等诸多任务,但是代码本身还是比较好懂的。
值得注意的是,await
方法是有返回值的,代表了线程到达的顺序,第一个到达的线程的index
为parties - 1
,最后一个到达的线程的index
为0
六、工具方法
除了重要的await
方法和一些辅助方法,CyclicBarrier
还提供了一些工具方法:
(1)获取参与的线程数parties
// 获取参与的线程数parties
public int getParties() {
return parties;
}
parties
在构造完成后就不会被修改了,因此对它的访问不需要加锁。
(2)获取正在等待中的线程数
// 获取正在等待中的线程数
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
注意,这里加了锁,因为count
属性可能会被多个线程同时修改。
(3)判断当前barrier
是否已经broken
// 判断当前barrier是否已经broken
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
注意,这里同样要加锁,因为broken
属性可能被多个线程同时访问或修改。
七、实例
// CyclicBarrier示例 多线程计算,并将结果合并
public class Solver {
private final int N; // 行数
private final float[][] data; // 二维数据
private final CyclicBarrier barrier; // 同步屏障
class Worker implements Runnable {
int myRow; // 第几行
Worker(int row) {
myRow = row;
}
@Override
public void run() {
while (!done()) {
processRow(myRow); // 处理这一行
try {
barrier.await(); // 处理完等待
} catch (InterruptedException | BrokenBarrierException ex) {
return;
}
}
}
}
public Solver(float[][] matrix) throws InterruptedException {
data = matrix;
N = matrix.length;
Runnable barrierAction = new Runnable() { // 所有线程处理完之后先执行的动作
@Override
public void run() {
mergeRows(); // 合并结果,此时done()返回true
}
};
barrier = new CyclicBarrier(N, barrierAction);
List<Thread> threads = new ArrayList<Thread>(N);
for (int i = 0; i < N; i++) {
Thread thread = new Thread(new Worker(i));
threads.add(thread);
thread.start();
}
// wait until done
for (Thread thread : threads) {
thread.join(); // 等待所有线程处理完成
}
}
// 合并结果
private void mergeRows() {
}
// 处理一行
private void processRow(int row) {
}
// 是否已完成
private boolean done() {
return false;
}
}
在这个例子中,为传入的matrix
数组的每一行都创建了一个线程进行处理,使用了CyclicBarrier
来保证只有所有的线程都处理完之后,才会调用mergeRows(...)
方法来合并结果。只要有一行没有处理完,所有的线程都会在barrier.await()
处等待,最后一个执行完的线程将会负责唤醒所有等待的线程。
总结
CyclicBarrier
实现了类似CountDownLatch
的逻辑,它可以使得一组线程之间相互等待,直到所有的线程都到齐了之后再继续往下执行。CyclicBarrier
基于条件队列和独占锁来实现,而非共享锁。CyclicBarrier
可重复使用,在所有线程都到齐了一起通过后,将会开启新的一代。CyclicBarrier
使用了“all-or-none breakage model”
,所有互相等待的线程,要么一起通过barrier
,要么一个都不要通过,如果有一个线程因为中断,失败或者超时而过早的离开了barrier
(抛出异常),则该barrier
会被broken
掉,所有等待在该barrier
上的其他线程都会抛出BrokenBarrierException
(或者InterruptedException
)。