Java 提供了几个帮助多个线程协作的类。
# CyclicBarrier
允许线程集等待直至其中预定数目的线程到达一个公共障栅 ( barrier ), 然后可以选择执行一个处理障栅的动作。
public class CyclicBarrierTest { | |
public static void main(String[] args) { | |
CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> System.out.println("over...")); | |
new People("threadA", cyclicBarrier).start(); | |
new People("threadB", cyclicBarrier).start(); | |
} | |
} | |
class People extends Thread { | |
CyclicBarrier cyclicBarrier; | |
public People(String name, CyclicBarrier cyclicBarrier) { | |
setName(name); | |
this.cyclicBarrier = cyclicBarrier; | |
} | |
@Override | |
public void run() { | |
System.out.println(getName() + " is waiting..."); | |
try { | |
cyclicBarrier.await(); | |
System.out.println(getName() + " is running..."); | |
} catch (InterruptedException | BrokenBarrierException e) { | |
e.printStackTrace(); | |
} | |
} | |
} |
运行结果:
threadA is waiting...
threadB is waiting...
over...
threadB is running...
threadA is running...
# Phaser
类似于循环障栅 , 不过有一个可变的计数。
public class PhaserTest { | |
public static void main(String[] args) { | |
Phaser phaser = new Phaser(3); | |
new Transfer("transfer1", phaser).start(); | |
new Transfer("transfer2", phaser).start(); | |
new Transfer("transfer3", phaser).start(); | |
} | |
} | |
class Transfer extends Thread { | |
private Phaser phaser; | |
String[] status = {"已收件", "已到达上海", "已到达北京", "已签收"}; | |
public Transfer(String name, Phaser phaser) { | |
setName(name); | |
this.phaser = phaser; | |
} | |
@Override | |
public void run() { | |
System.out.println(getName() + status[0]); | |
for (int i = 1; i <= status.length - 1; i++) { | |
phaser.arriveAndAwaitAdvance(); | |
System.out.println(getName() + status[i]); | |
} | |
phaser.arriveAndDeregister(); | |
} | |
} |
运行结果:
transfer1已收件
transfer3已收件
transfer2已收件
transfer2已到达上海
transfer1已到达上海
transfer3已到达上海
transfer3已到达北京
transfer1已到达北京
transfer2已到达北京
transfer2已签收
transfer1已签收
transfer3已签收
相当于:
public class CyclicBarrierTest { | |
public static void main(String[] args) { | |
CyclicBarrier cyclicBarrier = new CyclicBarrier(3); | |
new People("transfer1", cyclicBarrier).start(); | |
new People("transfer2", cyclicBarrier).start(); | |
new People("transfer3", cyclicBarrier).start(); | |
} | |
} | |
class People extends Thread { | |
CyclicBarrier cyclicBarrier; | |
String[] status = {"已收件", "已到达上海", "已到达北京", "已签收"}; | |
public People(String name, CyclicBarrier cyclicBarrier) { | |
setName(name); | |
this.cyclicBarrier = cyclicBarrier; | |
} | |
@Override | |
public void run() { | |
System.out.println(getName() + status[0]); | |
for (int i = 1; i <= status.length - 1; i ++) { | |
try { | |
cyclicBarrier.await(); | |
System.out.println(getName() + status[i]); | |
} catch (InterruptedException | BrokenBarrierException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
} |
运行结果:
transfer2已收件 | |
transfer1已收件 | |
transfer3已收件 | |
transfer1已到达上海 | |
transfer3已到达上海 | |
transfer2已到达上海 | |
transfer2已到达北京 | |
transfer1已到达北京 | |
transfer3已到达北京 | |
transfer3已签收 | |
transfer1已签收 | |
transfer2已签收 |
# CountDownLatch
允许线程集等待直到计数器减为 0。
public class CountDownLatchTest { | |
public static void main(String[] args) { | |
CountDownLatch countDownLatch = new CountDownLatch(3); | |
new Referee("referee", countDownLatch).start(); | |
new Runner("runner1", countDownLatch).start(); | |
new Runner("runner2", countDownLatch).start(); | |
new Runner("runner3", countDownLatch).start(); | |
} | |
} | |
class Runner extends Thread { | |
private CountDownLatch countDownLatch; | |
public Runner(String name, CountDownLatch countDownLatch) { | |
setName(name); | |
this.countDownLatch = countDownLatch; | |
} | |
@Override | |
public void run() { | |
System.out.println(getName() + " is ready."); | |
countDownLatch.countDown(); | |
} | |
} | |
class Referee extends Thread { | |
private CountDownLatch countDownLatch; | |
public Referee(String name, CountDownLatch countDownLatch) { | |
setName(name); | |
this.countDownLatch = countDownLatch; | |
} | |
@Override | |
public void run() { | |
try { | |
System.out.println(getName() + " is waiting."); | |
countDownLatch.await(); | |
System.out.println(getName() + " blew the spoon."); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} |
运行结果:
runner2 is ready.
runner1 is ready.
runner3 is ready.
比赛开始!
# Exchanger
允许两个线程在要交换的对象准备好时交换对象。
public class ExchangerTest { | |
public static void main(String[] args) { | |
Exchanger exchanger = new Exchanger(); | |
new Person("person1", exchanger).start(); | |
new Person("person2", exchanger).start(); | |
} | |
} | |
class Person extends Thread { | |
private Exchanger exchanger; | |
public Person(String name, Exchanger exchanger) { | |
setName(name); | |
this.exchanger = exchanger; | |
} | |
@Override | |
public void run() { | |
try { | |
System.out.println(getName() + "听到电话里嘟。。。\""); | |
String saySomething = (String) exchanger.exchange("你好,我是" + getName()); | |
System.out.println(getName() + "听到对方说: \"" + saySomething + "\""); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} |
运行结果:
person2听到电话里嘟。。。"
person1听到电话里嘟。。。"
person2听到对方说: "你好,我是person1"
person1听到对方说: "你好,我是person2"
# Semaphore
允许线程集等待直到被允许继续运行为止。
public class SemaphoreTest { | |
public static void main(String[] args) { | |
Semaphore seats = new Semaphore(2); | |
new Man("man1", seats).start(); | |
new Man("man2", seats).start(); | |
new Man("man3", seats).start(); | |
} | |
} | |
class Man extends Thread { | |
private Semaphore seats; | |
public Man(String name, Semaphore seats) { | |
setName(name); | |
this.seats = seats; | |
} | |
@Override | |
public void run() { | |
System.out.println(getName() + "排队。。。"); | |
try { | |
seats.acquire(); | |
System.out.println(getName() + "坐下。。。"); | |
Thread.sleep(1000); | |
System.out.println(getName() + "离开。。。"); | |
seats.release(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} |
运行结果:
man1排队。。。
man3排队。。。
man3坐下。。。
man2排队。。。
man1坐下。。。
man3离开。。。
man1离开。。。
man2坐下。。。
man2离开。。。
# SynchronousQueue
允许一个线程把对象交给另一个线程。
public class SynchronousQueueTest { | |
public static void main(String[] args) { | |
SynchronousQueue synchronousQueue = new SynchronousQueue(); | |
new Producer("producer", synchronousQueue).start(); | |
new Consumer("consumer", synchronousQueue).start(); | |
} | |
} | |
class Producer extends Thread { | |
private SynchronousQueue synchronousQueue; | |
public Producer(String name, SynchronousQueue synchronousQueue) { | |
setName(name); | |
this.synchronousQueue = synchronousQueue; | |
} | |
@Override | |
public void run() { | |
try { | |
System.out.println(getName() + "开始生产。。。"); | |
List<Integer> integerList = new ArrayList<>(); | |
for (int i = 0; i < 3; i++) { | |
integerList.add(i); | |
} | |
System.out.println(getName() + "将生产的数据放入队列。。。"); | |
synchronousQueue.put(integerList); | |
System.out.println(getName() + "又开始生产。。。"); | |
integerList = new ArrayList<>(); | |
for (int i = 3; i < 6; i++) { | |
integerList.add(i); | |
} | |
System.out.println(getName() + "又一次放入了数据。。。"); | |
synchronousQueue.put(integerList); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
class Consumer extends Thread { | |
private SynchronousQueue synchronousQueue; | |
public Consumer(String name, SynchronousQueue synchronousQueue) { | |
setName(name); | |
this.synchronousQueue = synchronousQueue; | |
} | |
@Override | |
public void run() { | |
System.out.println(getName() + "等待消费。。。"); | |
try { | |
System.out.println(getName() + "拿到了数据。。。"); | |
List<Integer> integerList = (List<Integer>) synchronousQueue.take(); | |
integerList.forEach(System.out::println); | |
System.out.println(getName() + "又一次拿到了数据。。。"); | |
integerList = (List<Integer>) synchronousQueue.take(); | |
integerList.forEach(System.out::println); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} |
运行结果:
producer开始生产。。。
producer将生产的数据放入队列。。。
consumer等待消费。。。
consumer拿到了数据。。。
producer又开始生产。。。
producer又一次放入了数据。。。
0
1
2
consumer又一次拿到了数据。。。
3
4
5