Java线程同步器

Java提供了几个帮助多个线程协作的类。

CyclicBarrier

允许线程集等待直至其中预定数目的线程到达一个公共障栅 ( barrier ), 然后可以选择执行一个处理障栅的动作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class CyclicBarrierTest {

public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> System.out.println("over..."));

new People("threadA", cyclicBarrier).start();
new People("threadB", cyclicBarrier).start();

}
}

class People extends Thread {

CyclicBarrier cyclicBarrier;

public People(String name, CyclicBarrier cyclicBarrier) {
setName(name);
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void run() {
System.out.println(getName() + " is waiting...");
try {
cyclicBarrier.await();
System.out.println(getName() + " is running...");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}

运行结果:

1
2
3
4
5
threadA is waiting...
threadB is waiting...
over...
threadB is running...
threadA is running...

Phaser

类似于循环障栅 , 不过有一个可变的计数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class PhaserTest {

public static void main(String[] args) {
Phaser phaser = new Phaser(3);

new Transfer("transfer1", phaser).start();
new Transfer("transfer2", phaser).start();
new Transfer("transfer3", phaser).start();
}
}

class Transfer extends Thread {

private Phaser phaser;

String[] status = {"已收件", "已到达上海", "已到达北京", "已签收"};

public Transfer(String name, Phaser phaser) {
setName(name);
this.phaser = phaser;
}

@Override
public void run() {
System.out.println(getName() + status[0]);
for (int i = 1; i <= status.length - 1; i++) {
phaser.arriveAndAwaitAdvance();
System.out.println(getName() + status[i]);
}
phaser.arriveAndDeregister();
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
transfer1已收件
transfer3已收件
transfer2已收件
transfer2已到达上海
transfer1已到达上海
transfer3已到达上海
transfer3已到达北京
transfer1已到达北京
transfer2已到达北京
transfer2已签收
transfer1已签收
transfer3已签收

相当于:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class CyclicBarrierTest {

public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);

new People("transfer1", cyclicBarrier).start();
new People("transfer2", cyclicBarrier).start();
new People("transfer3", cyclicBarrier).start();
}
}

class People extends Thread {

CyclicBarrier cyclicBarrier;

String[] status = {"已收件", "已到达上海", "已到达北京", "已签收"};

public People(String name, CyclicBarrier cyclicBarrier) {
setName(name);
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void run() {
System.out.println(getName() + status[0]);
for (int i = 1; i <= status.length - 1; i ++) {
try {
cyclicBarrier.await();
System.out.println(getName() + status[i]);
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
transfer2已收件
transfer1已收件
transfer3已收件
transfer1已到达上海
transfer3已到达上海
transfer2已到达上海
transfer2已到达北京
transfer1已到达北京
transfer3已到达北京
transfer3已签收
transfer1已签收
transfer2已签收

CountDownLatch

允许线程集等待直到计数器减为 0。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class CountDownLatchTest {

public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(3);

new Referee("referee", countDownLatch).start();
new Runner("runner1", countDownLatch).start();
new Runner("runner2", countDownLatch).start();
new Runner("runner3", countDownLatch).start();
}
}

class Runner extends Thread {

private CountDownLatch countDownLatch;

public Runner(String name, CountDownLatch countDownLatch) {
setName(name);
this.countDownLatch = countDownLatch;
}

@Override
public void run() {
System.out.println(getName() + " is ready.");
countDownLatch.countDown();
}
}

class Referee extends Thread {

private CountDownLatch countDownLatch;

public Referee(String name, CountDownLatch countDownLatch) {
setName(name);
this.countDownLatch = countDownLatch;
}

@Override
public void run() {
try {
System.out.println(getName() + " is waiting.");
countDownLatch.await();
System.out.println(getName() + " blew the spoon.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

运行结果:

1
2
3
4
runner2 is ready.
runner1 is ready.
runner3 is ready.
比赛开始!

Exchanger

允许两个线程在要交换的对象准备好时交换对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ExchangerTest {

public static void main(String[] args) {
Exchanger exchanger = new Exchanger();

new Person("person1", exchanger).start();
new Person("person2", exchanger).start();
}
}

class Person extends Thread {

private Exchanger exchanger;

public Person(String name, Exchanger exchanger) {
setName(name);
this.exchanger = exchanger;
}

@Override
public void run() {
try {
System.out.println(getName() + "听到电话里嘟。。。\"");
String saySomething = (String) exchanger.exchange("你好,我是" + getName());
System.out.println(getName() + "听到对方说: \"" + saySomething + "\"");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

运行结果:

1
2
3
4
person2听到电话里嘟。。。"
person1听到电话里嘟。。。"
person2听到对方说: "你好,我是person1"
person1听到对方说: "你好,我是person2"

Semaphore

允许线程集等待直到被允许继续运行为止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class SemaphoreTest {

public static void main(String[] args) {
Semaphore seats = new Semaphore(2);

new Man("man1", seats).start();
new Man("man2", seats).start();
new Man("man3", seats).start();
}
}

class Man extends Thread {

private Semaphore seats;

public Man(String name, Semaphore seats) {
setName(name);
this.seats = seats;
}

@Override
public void run() {
System.out.println(getName() + "排队。。。");
try {
seats.acquire();
System.out.println(getName() + "坐下。。。");
Thread.sleep(1000);
System.out.println(getName() + "离开。。。");
seats.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

运行结果:

1
2
3
4
5
6
7
8
9
man1排队。。。
man3排队。。。
man3坐下。。。
man2排队。。。
man1坐下。。。
man3离开。。。
man1离开。。。
man2坐下。。。
man2离开。。。

SynchronousQueue

允许一个线程把对象交给另一个线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class SynchronousQueueTest {

public static void main(String[] args) {
SynchronousQueue synchronousQueue = new SynchronousQueue();

new Producer("producer", synchronousQueue).start();
new Consumer("consumer", synchronousQueue).start();
}
}

class Producer extends Thread {

private SynchronousQueue synchronousQueue;

public Producer(String name, SynchronousQueue synchronousQueue) {
setName(name);
this.synchronousQueue = synchronousQueue;
}

@Override
public void run() {
try {
System.out.println(getName() + "开始生产。。。");
List<Integer> integerList = new ArrayList<>();
for (int i = 0; i < 3; i++) {
integerList.add(i);
}

System.out.println(getName() + "将生产的数据放入队列。。。");
synchronousQueue.put(integerList);

System.out.println(getName() + "又开始生产。。。");
integerList = new ArrayList<>();
for (int i = 3; i < 6; i++) {
integerList.add(i);
}

System.out.println(getName() + "又一次放入了数据。。。");
synchronousQueue.put(integerList);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

class Consumer extends Thread {

private SynchronousQueue synchronousQueue;

public Consumer(String name, SynchronousQueue synchronousQueue) {
setName(name);
this.synchronousQueue = synchronousQueue;
}

@Override
public void run() {
System.out.println(getName() + "等待消费。。。");
try {
System.out.println(getName() + "拿到了数据。。。");
List<Integer> integerList = (List<Integer>) synchronousQueue.take();
integerList.forEach(System.out::println);
System.out.println(getName() + "又一次拿到了数据。。。");
integerList = (List<Integer>) synchronousQueue.take();
integerList.forEach(System.out::println);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
producer开始生产。。。
producer将生产的数据放入队列。。。
consumer等待消费。。。
consumer拿到了数据。。。
producer又开始生产。。。
producer又一次放入了数据。。。
0
1
2
consumer又一次拿到了数据。。。
3
4
5
-------------本文结束感谢您的阅读-------------