通信工具类

JDK 中提供了一些并发编程中常用的通信工具类以供我们开发者使用。 它们都在 JUC 包下。先总体概括一下都有哪些工具类,它们有什么作用,然后再分别介绍它们的主要使用方法和原理。

JDK 中提供了一些并发编程中常用的通信工具类以供我们开发者使用

作用
Semaphore 限制线程的数量
Exchanger 两个线程交换数据
CountDownLatch 线程等待直到计数器减为 0 时开始工作
CyclicBarrier 作用跟 CountDownLatch 类似,但是可以重复使用
Phaser 增强的 CyclicBarrier

CountDownLatch

一个减法计数器, 基于信号量

class CountTest{
	  public static void main(String[] args) throws InterruptedException {
		//设置一个尺寸为10的信号量
		 //总数是10,必须要执行任务的时候,再使用
        CountDownLatch countDownLatch = new CountDownLatch(10);

        for(int i=0; i<10; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+" Go out");
                countDownLatch.countDown(); //数量减1
            },String.valueOf(i)).start();
        }

        countDownLatch.await(); //等待计数器归零,再向下执行

        System.out.println("Hello world!");
    }

}

两个方法

  • countDown() 调用后计数减一
  • await() 使线程挂起

Exchanger

A synchronization point at which threads can pair and swap elements within pairs. Each thread presents some object on entry to the exchange method, matches with a partner thread, and receives its partner’s object on return. An Exchanger may be viewed as a bidirectional form of a SynchronousQueue. Exchangers may be useful in applications such as genetic algorithms and pipeline designs.

  • 此类提供对外的操作是同步的;
  • 用于成对出现的线程之间交换数据;
  • 可以视作双向的同步队列;
  • 可应用于基因算法、流水线设计等场景。
class FillAndEmpty {  
    Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();  
    DataBuffer initialEmptyBuffer = ... 
    DataBuffer initialFullBuffer = ...  
  
    class FillingLoop implements Runnable {  
        public void run() {  
	        //初始化缓存
            DataBuffer currentBuffer = initialEmptyBuffer;  
            try {  
                while (currentBuffer != null) {  
                    addToBuffer(currentBuffer);  
                    if (currentBuffer.isFull()) 
	                    //如果缓存满, 需要交换, 在这里等 
                        currentBuffer = exchanger.exchange(currentBuffer);  
                }  
            } catch (InterruptedException ex) { ... handle ... }  
        }  
    }  
  
    class EmptyingLoop implements Runnable {  
        public void run() {  
            DataBuffer currentBuffer = initialFullBuffer;  
            try {  
                while (currentBuffer != null) {  
                    takeFromBuffer(currentBuffer);  
                    if (currentBuffer.isEmpty()) 
	                    //如果缓存空, 需要交换, 在这里等 
                        currentBuffer = exchanger.exchange(currentBuffer);  
                }  
            } catch (InterruptedException ex) { ... handle ...}  
        }  
    }  
  
    void start() {  
        new Thread(new FillingLoop()).start();  
        new Thread(new EmptyingLoop()).start();  
    }  
}

当一个线程调用 exchange 方法后,会处于阻塞状态,只有当另一个线程也调用了 exchange 方法,它才会继续执行。

image.png

内部使用 CAS 检查, 使用park/unpark 做状态转换