Java并发之CyclicBarrier原理及使用

CyclicBarrier字面意思是循环栅栏,它相当于是一个栅栏,所有线程在到达该栅栏后都需要等待其他线程,等所有线程都到达后再一起通过,它是循环的,可以用作重复的同步。

简介

CyclicBarrier是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。

Demo

下面一个简单例子,多个游客线程分别在集合点A和B同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class CyclicBarrierDemo {
static class Tourist extends Thread {
CyclicBarrier barrier;

public Tourist(CyclicBarrier barrier) {
this.barrier = barrier;
}

@Override
public void run() {
try {
// 模拟先各自独立运行
Thread.sleep((int) (Math.random() * 1000));

// 集合点A
barrier.await();

System.out.println(this.getName() + " arrived A "
+ System.currentTimeMillis());

// 集合后模拟再各自独立运行
Thread.sleep((int) (Math.random() * 1000));

// 集合点B
barrier.await();
System.out.println(this.getName() + " arrived B "
+ System.currentTimeMillis());
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
}
}
}

public static void main(String[] args) {
int num = 3;
Tourist[] threads = new Tourist[num];
CyclicBarrier barrier = new CyclicBarrier(num, new Runnable() {

@Override
public void run() {
System.out.println("all arrived " + System.currentTimeMillis()
+ " executed by " + Thread.currentThread().getName());
}
});
for (int i = 0; i < num; i++) {
threads[i] = new Tourist(barrier);
threads[i].start();
}
}
}

执行结果:

1
2
3
4
5
6
7
8
all arrived 1490053578552 executed by Thread-1
Thread-1 arrived A 1490053578555
Thread-2 arrived A 1490053578555
Thread-0 arrived A 1490053578555
all arrived 1490053578889 executed by Thread-0
Thread-0 arrived B 1490053578890
Thread-2 arrived B 1490053578890
Thread-1 arrived B 1490053578890

可以看出多个线程到达A和B的时间是一样的,使用CyclicBarrier,达到了重复同步的目的。

CyclicBarrier原理

构造函数

在我们的使用CyclicBarrier时,第一步肯定是new一个CyclicBarrier对象。

如果我们没有 栅栏任务(barrierAction)需要指定,则就直接使用如下的构造函数构造对象。

1
2
3
4
5
6
7
/*
*创建一个新的CyclicBarrier对象,当给定的线程都到达这个临界点等待(即调用await方法),则开启barrier。
*当开启barrier时并没有任何预先定义的action需要执行。
*/
public CyclicBarrier(int parties) {
this(parties, null);
}

其中构造函数中的参数指的就是你准备需要多少个线程等待至公共屏障点。

如果有 栅栏任务(barrierAction)需要指定,则就需要使用如下的构造函数构造对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
/*
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and which
* will execute the given barrier action when the barrier is tripped,
* performed by the last thread entering the barrier.
*
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

创建一个新的CyclicBarrier对象,当给定的线程都到达这个临界点等待(即调用await方法),则开启barrier。当开启barrier时由最后一个进入barrier的线程来执行预先定义的action。

await()/await(time,TimeUnit)方法

在CyclicBarrier中最重要的方法莫过于await()方法,在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。如下:

1
2
3
4
5
6
7
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);//不超时等待
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

await()方法内部调用dowait(boolean timed, long nanos)方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;

if (g.broken)//检查状态,如果为true,则说明已经broken了
throw new BrokenBarrierException();
//检查当前线程是否被中断,如果被中断先调用breakBarrier方法然后抛中断异常
if (Thread.interrupted()) {
breakBarrier();//设置generation并且唤醒所有正在等待的线程
throw new InterruptedException();
}

int index = --count;//先减一然后再赋值
if (index == 0) { // tripped,打开barrier
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();//当前线程调用command的run方法
//从这里可以看出,调用barrier的command的run方法在唤醒其他所有正在等待的线程在前。
ranAction = true;
nextGeneration();//唤醒所有的正在等待的线程并且设置状态为下一次重复利用做准备
return 0;
} finally {
if (!ranAction)//如果command中的run方法抛异常,则就运行这里的breakBarrier()方法来唤醒所有其他正在等待的线程。
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
//如果当前线程不是最后一个到达的线程,则一直循环检测等待,直到tripped, broken, interrupted, or timed out发生
for (;;) {
try {
if (!timed) //如果没有设置等待时间,则一直等待,直到其它线程唤醒
trip.await();
else if (nanos > 0L)//如果设置了等待时间,则等待指定的时间。
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
//如果再等待的期间发生了中断异常,如果其它线程还没有开始唤醒工作,则当前线程就开始唤醒
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();

if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

/*
* Sets current barrier generation as broken and wakes up everyone.
* Called only while holding lock.
设置当前的generation状态为broken且唤醒所有正在等待的线程。

*/
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}

/*
* Updates state on barrier trip and wakes up everyone.
* Called only while holding lock.
*更新barrier的状态为重复利用做准备并且唤醒所有正在等待的线程
*/
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}

await()的处理逻辑:如果该线程不是到达的最后一个线程,则他会一直处于等待状态,除非发生以下情况:

  • 最后一个线程到达,即index == 0
  • 超出了指定时间(超时等待)
  • 其他的某个线程中断当前线程
  • 其他的某个线程中断另一个等待的线程
  • 其他的某个线程在等待barrier超时
  • 其他的某个线程在此barrier调用reset()方法。reset()方法用于将屏障重置为初始状态。

Generation描述着CyclicBarrier的更显换代。在CyclicBarrier中,同一批线程属于同一代。当有parties个线程到达barrier,generation就会被更新换代。其中broken标识该当前CyclicBarrier是否已经处于中断状态。

1
2
3
private static class Generation {
boolean broken = false;
}

默认barrier是没有损坏的。

当barrier损坏了或者有一个线程中断了,则通过breakBarrier()来终止所有的线程:

1
2
3
4
5
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}

在breakBarrier()中除了将broken设置为true,还会调用signalAll将在CyclicBarrier处于等待状态的线程全部唤醒。

当所有线程都已经到达barrier处(index == 0),则会通过nextGeneration()进行更新换地操作,在这个步骤中,做了三件事:唤醒所有线程,重置count,generation。

1
2
3
4
5
private void nextGeneration() {
trip.signalAll();
count = parties;
generation = new Generation();
}

CyclicBarrier与CountDownLatch区别

  • CountDownLatch的参与线程是有不同角色的,有的负责倒计时,有的在等待倒计时变为0,负责倒计时和等待倒计时的线程都可以有多个,它用于不同角色线程间的同步。
  • CyclicBarrier的参与线程角色是一样的,用于同一角色线程间的协调一致。
  • CountDownLatch是一次性的,而CyclicBarrier是可以重复利用的。
  • CountDownLatch倾向于一个线程等多个线程,CyclicBarrier倾向于多个线程互相等待