Java 提供了几个帮助多个线程协作的类。

# CyclicBarrier

允许线程集等待直至其中预定数目的线程到达一个公共障栅 ( barrier ), 然后可以选择执行一个处理障栅的动作。

public class CyclicBarrierTest {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> System.out.println("over..."));
        new People("threadA", cyclicBarrier).start();
        new People("threadB", cyclicBarrier).start();
    }
}
class People extends Thread {
    CyclicBarrier cyclicBarrier;
    public People(String name, CyclicBarrier cyclicBarrier) {
        setName(name);
        this.cyclicBarrier = cyclicBarrier;
    }
    @Override
    public void run() {
        System.out.println(getName() + " is waiting...");
        try {
            cyclicBarrier.await();
            System.out.println(getName() + " is running...");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

运行结果:

threadA is waiting...
threadB is waiting...
over...
threadB is running...
threadA is running...

# Phaser

类似于循环障栅 , 不过有一个可变的计数。

public class PhaserTest {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(3);
        new Transfer("transfer1", phaser).start();
        new Transfer("transfer2", phaser).start();
        new Transfer("transfer3", phaser).start();
    }
}
class Transfer extends Thread {
    private Phaser phaser;
    String[] status = {"已收件", "已到达上海", "已到达北京", "已签收"};
    public Transfer(String name, Phaser phaser) {
        setName(name);
        this.phaser = phaser;
    }
    @Override
    public void run() {
        System.out.println(getName() + status[0]);
        for (int i = 1; i <= status.length - 1; i++) {
            phaser.arriveAndAwaitAdvance();
            System.out.println(getName() + status[i]);
        }
        phaser.arriveAndDeregister();
    }
}

运行结果:

transfer1已收件
transfer3已收件
transfer2已收件
transfer2已到达上海
transfer1已到达上海
transfer3已到达上海
transfer3已到达北京
transfer1已到达北京
transfer2已到达北京
transfer2已签收
transfer1已签收
transfer3已签收

相当于:

public class CyclicBarrierTest {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        new People("transfer1", cyclicBarrier).start();
        new People("transfer2", cyclicBarrier).start();
        new People("transfer3", cyclicBarrier).start();
    }
}
class People extends Thread {
    CyclicBarrier cyclicBarrier;
    String[] status = {"已收件", "已到达上海", "已到达北京", "已签收"};
    public People(String name, CyclicBarrier cyclicBarrier) {
        setName(name);
        this.cyclicBarrier = cyclicBarrier;
    }
    @Override
    public void run() {
        System.out.println(getName() + status[0]);
        for (int i = 1; i <= status.length - 1; i ++) {
            try {
                cyclicBarrier.await();
                System.out.println(getName() + status[i]);
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

运行结果:

transfer2已收件
transfer1已收件
transfer3已收件
transfer1已到达上海
transfer3已到达上海
transfer2已到达上海
transfer2已到达北京
transfer1已到达北京
transfer3已到达北京
transfer3已签收
transfer1已签收
transfer2已签收

# CountDownLatch

允许线程集等待直到计数器减为 0。

public class CountDownLatchTest {
    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(3);
        new Referee("referee", countDownLatch).start();
        new Runner("runner1", countDownLatch).start();
        new Runner("runner2", countDownLatch).start();
        new Runner("runner3", countDownLatch).start();
    }
}
class Runner extends Thread {
    private CountDownLatch countDownLatch;
    public Runner(String name, CountDownLatch countDownLatch) {
        setName(name);
        this.countDownLatch = countDownLatch;
    }
    @Override
    public void run() {
        System.out.println(getName() + " is ready.");
        countDownLatch.countDown();
    }
}
class Referee extends Thread {
    private CountDownLatch countDownLatch;
    public Referee(String name, CountDownLatch countDownLatch) {
        setName(name);
        this.countDownLatch = countDownLatch;
    }
    @Override
    public void run() {
        try {
            System.out.println(getName() + " is waiting.");
            countDownLatch.await();
            System.out.println(getName() + " blew the spoon.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

运行结果:

runner2 is ready.
runner1 is ready.
runner3 is ready.
比赛开始!

# Exchanger

允许两个线程在要交换的对象准备好时交换对象。

public class ExchangerTest {
    public static void main(String[] args) {
        Exchanger exchanger = new Exchanger();
        new Person("person1", exchanger).start();
        new Person("person2", exchanger).start();
    }
}
class Person extends Thread {
    private Exchanger exchanger;
    public Person(String name, Exchanger exchanger) {
        setName(name);
        this.exchanger = exchanger;
    }
    @Override
    public void run() {
        try {
            System.out.println(getName() + "听到电话里嘟。。。\"");
            String saySomething = (String) exchanger.exchange("你好,我是" + getName());
            System.out.println(getName() + "听到对方说: \"" + saySomething + "\"");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

运行结果:

person2听到电话里嘟。。。"
person1听到电话里嘟。。。"
person2听到对方说: "你好,我是person1"
person1听到对方说: "你好,我是person2"

# Semaphore

允许线程集等待直到被允许继续运行为止。

public class SemaphoreTest {
    public static void main(String[] args) {
        Semaphore seats = new Semaphore(2);
        new Man("man1", seats).start();
        new Man("man2", seats).start();
        new Man("man3", seats).start();
    }
}
class Man extends Thread {
    private Semaphore seats;
    public Man(String name, Semaphore seats) {
        setName(name);
        this.seats = seats;
    }
    @Override
    public void run() {
        System.out.println(getName() + "排队。。。");
        try {
            seats.acquire();
            System.out.println(getName() + "坐下。。。");
            Thread.sleep(1000);
            System.out.println(getName() + "离开。。。");
            seats.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

运行结果:

man1排队。。。
man3排队。。。
man3坐下。。。
man2排队。。。
man1坐下。。。
man3离开。。。
man1离开。。。
man2坐下。。。
man2离开。。。

# SynchronousQueue

允许一个线程把对象交给另一个线程。

public class SynchronousQueueTest {
    public static void main(String[] args) {
        SynchronousQueue synchronousQueue = new SynchronousQueue();
        new Producer("producer", synchronousQueue).start();
        new Consumer("consumer", synchronousQueue).start();
    }
}
class Producer extends Thread {
    private SynchronousQueue synchronousQueue;
    public Producer(String name, SynchronousQueue synchronousQueue) {
        setName(name);
        this.synchronousQueue = synchronousQueue;
    }
    @Override
    public void run() {
        try {
            System.out.println(getName() + "开始生产。。。");
            List<Integer> integerList = new ArrayList<>();
            for (int i = 0; i < 3; i++) {
                integerList.add(i);
            }
            System.out.println(getName() + "将生产的数据放入队列。。。");
            synchronousQueue.put(integerList);
            System.out.println(getName() + "又开始生产。。。");
            integerList = new ArrayList<>();
            for (int i = 3; i < 6; i++) {
                integerList.add(i);
            }
            System.out.println(getName() + "又一次放入了数据。。。");
            synchronousQueue.put(integerList);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
class Consumer extends Thread {
    private SynchronousQueue synchronousQueue;
    public Consumer(String name, SynchronousQueue synchronousQueue) {
        setName(name);
        this.synchronousQueue = synchronousQueue;
    }
    @Override
    public void run() {
        System.out.println(getName() + "等待消费。。。");
        try {
            System.out.println(getName() + "拿到了数据。。。");
            List<Integer> integerList = (List<Integer>) synchronousQueue.take();
            integerList.forEach(System.out::println);
            System.out.println(getName() + "又一次拿到了数据。。。");
            integerList = (List<Integer>) synchronousQueue.take();
            integerList.forEach(System.out::println);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

运行结果:

producer开始生产。。。
producer将生产的数据放入队列。。。
consumer等待消费。。。
consumer拿到了数据。。。
producer又开始生产。。。
producer又一次放入了数据。。。
0
1
2
consumer又一次拿到了数据。。。
3
4
5