CyclicBarrier是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。因为该barrier在释放等待线程后可以重用,所以称它为循环的barrier。
注意比较CountDownLatch和CyclicBarrier:
- CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待。
- CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。
CyclicBarrier函数列表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
CyclicBarrier(int parties)
CyclicBarrier(int parties, Runnable barrierAction)
int await()
int await(long timeout, TimeUnit unit)
int getNumberWaiting()
int getParties()
boolean isBroken()
void reset()
|
CyclicBarrier数据结构
CyclicBarrier的UML类图如下:
CyclicBarrier是包含了”ReentrantLock对象lock”和”Condition对象trip”,它是通过独占锁实现的。
CyclicBarrier原理分析
构造函数
CyclicBarrier的构造函数共2个:CyclicBarrier和CyclicBarrier(int parties, Runnable barrierAction)。第1个构造函数是调用第2个构造函数来实现的,下面第2个构造函数的源码。
1 2 3 4 5 6 7 8 9
| public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
|
等待函数
CyclicBarrier.java中await()方法如下:
1 2 3 4 5 6 7
| public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); } }
|
await()是通过dowait()实现的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
| private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation;
if (g.broken) throw new BrokenBarrierException();
if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); }
int index = --count; if (index == 0) { boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } }
for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } }
if (g.broken) throw new BrokenBarrierException();
if (g != generation) return index;
if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
|
dowait()的作用就是让当前线程阻塞,直到“有parties个线程到达barrier”或“当前线程被中断”或“超时”这3者之一发生,当前线程才继续执行。
generation是CyclicBarrier的一个成员遍历,它的定义如下:
1 2 3 4
| private Generation generation = new Generation(); private static class Generation { boolean broken = false; }
|
在CyclicBarrier中,同一批的线程属于同一代,即同一个Generation;CyclicBarrier中通过generation对象,记录属于哪一代。当有parties个线程到达barrier,generation就会被更新换代。
如果当前线程被中断,即Thread.interrupted()为true;则通过breakBarrier()终止CyclicBarrier。
1 2 3 4 5
| private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }
|
breakBarrier()会设置当前中断标记broken为true,意味着“将该Generation中断”;同时,设置count=parties,即重新初始化count;最后,通过signalAll()唤醒CyclicBarrier上所有的等待线程。
将“count计数器”-1,即–count;然后判断是不是“有parties个线程到达barrier”,即index是不是为0。
当index=0时,如果barrierCommand不为null,则执行该barrierCommand,barrierCommand就是我们创建CyclicBarrier时,传入的Runnable对象。然后,调用nextGeneration()进行换代工作.
1 2 3 4 5
| private void nextGeneration() { trip.signalAll(); count = parties; generation = new Generation(); }
|
首先,它会调用signalAll()唤醒CyclicBarrier上所有的等待线程;接着,重新初始化count;最后,更新generation的值。
在for(;;)循环中。timed是用来表示当前是不是“超时等待”线程。如果不是,则通过trip.await()进行等待;否则,调用awaitNanos()进行超时等待。
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| public class CyclicBarrierDemo { final CyclicBarrier barrier; final int MAX_TASK;
public CyclicBarrierDemo(int cnt) { barrier = new CyclicBarrier(cnt + 1); MAX_TASK = cnt; }
public void doWork(final Runnable work) { new Thread() { public void run() { work.run(); try { int index = barrier.await(); doWithIndex(index); } catch (InterruptedException e) { return; } catch (BrokenBarrierException e) { return; } } }.start(); }
private void doWithIndex(int index) { if (index == MAX_TASK / 3) { System.out.println("Left 30%."); } else if (index == MAX_TASK / 2) { System.out.println("Left 50%"); } else if (index == 0) { System.out.println("run over"); } }
public void waitForNext() { try { doWithIndex(barrier.await()); } catch (InterruptedException e) { return; } catch (BrokenBarrierException e) { return; } }
public static void main(String[] args) { final int count = 10; CyclicBarrierDemo demo = new CyclicBarrierDemo(count); for (int i = 0; i < 100; i++) { demo.doWork(new Runnable() {
public void run() { try { Thread.sleep(1000L); } catch (Exception e) { return; } } }); if ((i + 1) % count == 0) { demo.waitForNext(); } } }
}
|