Java线程间协作(三)

Posted by kyle on May 8, 2018

阻塞队列

从传输通道中存入数据或取出数据时,相应的线程可能因为传输通道中没有数据或者其存储空间已满而挂起,这种传输通道的运作方式称为阻塞式

一般而言,一个方法或者操作如果能够导致其执行线程被挂起,那么我们就称相应的的方法/操作为阻塞方法或者阻塞操作

常见的阻塞方法包括:InputStream.read()、ReentrantLock.lock()、申请内部锁等。

JDK1.5中引入的接口java.util.concurrent.BlockingQueue定义了一种线程安全的队列——阻塞队列。

阻塞队列按照其存储空间的容量是否受限制来划分,可分为有界队列(Bounded Queue)无界队列(Unbounded Queue)。有界队列的存储容量是由应用程序指定的,无界队列的最大存储容量为Integer.MAX_VALUE($2^{31} - 1$)。

往队列中存入一个元素的操作被称为put操作,从队列中取出一个元素的操作被称为take操作

ArrayBlockingQueue

ArrayBlockingQueue内部使用一个数组作为其存储空间,而数组的存储空间是预先分配的,因此ArrayBlockingQueue的put/take操作本身并不会增加垃圾回收的负担。ArrayBlockingQueue的缺点是其内部在实现put/take操作的时候使用的是同一个显示锁,从而导致锁的高争用,进而导致较多的上下文切换。

LinkedBlockingQueue

LinkedBlockingQueue既能实现无界队列,也能实现有界队列。LinkedBlockingQueue的其中一个构造函数允许我们创建队列的时候指定队列容量。LinkedBlockingQueue的优点是其内部在实现put/take操作的时候分别使用了两个显示锁(putLock/takeLock),这降低了锁的争用。LinkedBlockingQueue的内部存储空间是一个链表,而链表节点所需的存储空间是动态分配的,因此LinkedBlockingQueue的缺点是它可能增加垃圾回收的负担。此外,由于put/take操作使用的是两个锁,LinkedBlockingQueue维护其队列的当前长度(Size)时无法使用一个普通的int型变量,而是使用了原子变量AtomicInteger,这也会导致额外的开销。

SynchronousQueue

SynchronousQueue可以看成一种特殊的有界队列。其内部并不维护用户存储队列元素的存储空间。 不能调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是不行的,当然遍历这个队列的操作也是不允许的。 SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。

信号量:Semaphore

信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施,是可以用来保证两个或多个关键代码段不被并发调用。在进入一个关键代码段之前,线程必须获取一个信号量;一旦该关键代码段完成了,那么该线程必须释放信号量。其它想进入该关键代码段的线程必须等待直到第一个线程释放信号量。

JDK1.5中引入的标准类库java.util.concurrent.Semaphore可以用来实现信号量控制。Semaphore.acquire()/release()分别用于申请信号量和释放信号量。

TIPS:

  • acquire()和release()总是配对使用;
  • release()调用总是应该放在一个finally块中,以避免程序执行发生异常时当前申请的信号量无法释放;
  • 创建Semaphore实例时,如果构造器中参数permits值为1,那么所创建的Semaphore实例相当于一个互斥锁。与其他互斥锁不同的是:由于一个线程可以在未执行过Semaphore.acquire()的情况下执行相应的Semaphore.release(),因此这种互斥锁允许一个线程释放另外一个线程锁持有的锁;
  • 默认情况下,Semaphore采用的是非公平性调度策略。

管道:PipedOutputStream/PipedInputStream

PipedOutputStream/PipedInputStream分别是OutputStream/InputStream的一个子类,它们可以用来实现线程间的直接输出和输入。

所谓的“直接”是指从应用代码的角度来看,一个线程的输出可作为另外一个线程的输入,而不必借用文件、数据库、网络连接等其他数据交换中介。

PipedOutputStream相当于生产者,其生产的产品是字节形式的数据;PipedInputStream相当于消费者,内部使用byte型数组维护了一个循环缓冲区(Cirular Buffer),这个缓冲区相当于传输通道。

在PipedOutputStream/PipedInputStream进行输入、输出操作前,PipedOutputStream实例和PipedInputStream实例需要建立起关联(Connect)。两者之间可以通过调用各自实例的connect方法实现关联,也可以通过在创建相应实例的时候将对方的实例指定为自己的构造器参数来实现。

双缓冲:Exchanger

缓冲(Buffering)是一种常用的数据传递技术。缓冲区相当于数据源(Source)与数据使用方(Sink)之间的数据容器。

在多线程环境下,有时候我们会使用两个(或者更多)缓冲区来实现数据从数据源到数据使用方的移动。其中一个缓冲区填充来自数据源的数据后可以被数据使用方进行“消费”,而另外一个空的(或者是已经取完数据的)缓冲区则用来填充来自数据源的新的数据。

数据源与数据使用方通过轮次访问缓存区,实现了数据生成与消费的并发。这种缓冲技术就被称为双缓冲

JDK1.5中引入的标准类库java.util.concurrent.Exchanger可以用来实现双缓冲。