1.生产者—消费者模式

在生产者—消费者模式中,生产者 (Producer) 的主要职责是生产(创建)产品(Product)。产品既可以是数据,也可以是任务。消费者 (Consumer) 的主要职责是消费生产者所生产的产品。这里的“消费“包括对产品所代表的数据进行加工处理或者执行产品所代表的任务。生产者和消费者是并发地运行在各自的线程之中的,这就意味着运用生产者—消费者模式可以使程序中原本串行的处理得以并发化。
由于线程之间无法像函数调用那样通过参数直接传递数据,因此生产者和消费者之间需要一个用于传递产品的传输通道 (Channel)。传输通道相当于生产者和消费者之间的缓冲区,生产者每生产一个产品就将其放入传输通道,消费者则不断地从传输通道中取出产品进行消费,传输通道通常可以使用一个线程安全的队列来实现。生产者—消费者模式如下图所示,其中 Producer 可以运行在一个或者多个线程中, Consumer 也可以运行在一个或者多个线程中。

由于生产者和消费者运行在不同的线程中,因此生产者将产品(对象)存入传输通道,消费者再从相应的传输通道中取出产品的过程其实就是生产者线程将对象(产品)发布到消费者线程的过程,这种对象发布必须是线程安全的。
通常,生产者和消费者的处理能力是不同的,即生产者生产产品的速率和消费者消费产品的速率是不同的、较为常见的情形是生产者的处理能力比消费者的处理能力大。这种情况下,传输通道所起的作用不仅仅作为生产者和消费者之间传递数据的中介,它在一定程度上还起到一个平衡生产者和消费者处理能力的作用。

2.阻塞队列

从传输通道中存入一个产品或者取出一个产品时,相应的线程可能因为传输通道中没有产品或者其存储空间已满而被阻塞(暂停)。称这种传输通道的运作方式为阻塞式。BlockingQueue的常用实现类包括ArrayBlockingQueue 、 LinkedBlockingQueue 和SynchronousQueue 等。

  • 有界队列可以使用 ArrayBlockingQueue 或者 LinkedBlockingQueue 来实现。ArrayBlockingQueue内部使用一个数组作为其存储空间,而数组的存储空间是预先分配的,因此 ArrayBlockingQueue 的put 操作、 take 操作本身并不会增加垃圾回收的负担。ArrayBlockingQueue 的缺点是其内部在实现 put 、 take 操作的时候使用的是同一个锁(显式锁),从而可能导致锁的高争用,进而导致较多的上下文切换。
  • LinkedBlockingQueue既能实现无界队列,也能实现有界队列。LinkedBlockingQueue的其中一个构造器允许我们创建队列的时候指定队列容量。 LinkedBlockingQueue 的优点是其内部在实现 put 、 take 操作的时候分别使用了两个显式锁 (putLock 和 takeLock), 这降低了锁争用的可能性。LinkedBlockingQueue 的内部存储空间是一个链表,而链表节点(对象)所需的存储空间是动态分配的,put 操作、 take 操作都会导致链表节点的动态创建和移除,因此 LinkedBlockingQueu 的缺点是它可能增加垃圾回收的负担。另外,由于LinkedBlockingQueue 的 put 、 take 操作使用的是两个锁,因此 LinkedBlockingQueue 维护其队列的当前长度 (size) 时无法使用一个普通的 int 型变量而是使用了一个原子变量(Atomiclnteger) 。 这个原子变量可能会被生产者线程和消费者线程争用,因此它可能导致额外的开销 。
  • SynchronousQueue可以被看作一种特殊的有界队列。SynchronousQueue 内部并不维护用于存储队列元素的存储空间 。 在使用 SynchronousQueue 作为传输通道的生产者一消费者模式中,生产者线程生产好一个产品之后,会等待消费者线程来取走这个产品才继续生产下一个产品,而不像使用ArrayBlockingQueue 、LinkedBlockingQueue 作为传输通道的情况下生产者线程将生产好的产品存入队列就继续生产下一个产品。因此,SynchronousQueue 适合于在消费者处理能力与生产者处理能力相差不大的情况下使用 。 否则,由于生产者线程执行 put 操作时没有消费者线程执行 take 操作,或者消费者线程执行 take 操作的时候没有生产者线程执行put 操作的概率比较大,从而可能导致较多的等待(这意味着上下文切换)。

ArrayBlockingQueue和SynchronousQueue都既支持非公平调度也支持公平调度,而 LinkedBlockingQueue 仅支持非公平调度 。

  • LinkedBlockingQueue 适合在生产者线程和消费者线程之间的并发程度比较大的情况下使用。
  • ArrayBlockingQueue 适合在生产者线程和消费者线程之间的并发程度较低的情况下使用。
  • SynchronousQueue 适合在消费者处理能力与生产者处理能力相差不大的情况下使用。

3.流量控制与信号量

使用无界队列作为传输通道的一个好处是 put 操作并不会导致生产者线程被阻塞。 因此,无界队列的使用不会影响生产者线程的步伐 。 但是在队列积压的情况下,无界队列中储的元素可能越来越多,最终导致这些元素所占用的资源过多 。 因此,一般我们在使用无界队列作为传输通道的时候会同时限制生产者的生产速率,即进行流扯控制以避免传输通道中积压过多的产品 。
JDK 1.5 中引入的标准库类 java. utiI.concurrent.Semaphore 可以用来实现流显控制。Semaphore 相当于虚拟资源配额管理器,它可以用来控制同一时间内对虚拟资源的访间次数 。为了对虚拟资源的访问进行流批控制,我们必须使相应代码只有在获得相应配额的情况下才能够访问这些资源。为此,相应代码在访问虚拟资源前必须先申请相应的配额,并在资源访问结束后返还相应的配额 。Semaphore.acquire()/release()分别用于申请配额和返还配额。Semaphore.acquire()在成功获得一个配额后会立即返回。如果当前的可用配额不足,那么Semaphore.acquire()会使其执行线程暂停。Semaphore内部会维护一个等待队列用于存储这些被暂停的线程。Semaphore.acquire() 在其返回之前总是会将当前的可用配额减少1。Semaphore.release()会使当前可用配额增加1, 并唤醒相应 Semaphore 实例的等待队列中的一个任意等待线程 。

  • Semaphore.acquire()和 Semaphore.release()总是配对使用的,这点需要由应用代码自身来保证。
  • Semaphore.release()调用总是应该放在—个finally块中,以避免虚拟资源访问出现异常的情况下当前线程所获得的配额无法返还。

4.双缓冲与Exchanger

在多线程环境下,有时候我们会使用两个(或者更多)缓冲区来实现数据从数据源到数据使用方的移动。其中一个缓冲区填充满来自数据源的数据后可以被数据使用方进行“消费”,而另外一个空的(或者已经使用过的)缓冲区则用来填充来自数据源的新的数据 。 这里,负责填充缓冲区的是一个线程( 生产者线程),而使用已填充完毕的另外一个缓冲区的则是另外一个线程(消费者线程) 。 因此,当消费者线程消费一个已填充的缓冲区时,另外一个缓冲区可以由生产者线程进行填充,从而实现了数据生成与消费的并发。这种缓冲技术就被称为双缓冲 (Double Buffering)。
JDK 1.5 中引入的标准库类 java.util.concurrent.Exchanger可以用来实现双缓冲。消费者线程执行Exchanger.exchange(V) 时将参数 x 指定为一个空的或者已经使用过的缓冲区,生产者线程执行Exchanger.exchange(V) 时将参数 x 指定为一个已经填充完毕的缓冲区。比照CyclicBarrier来说,生产者线程和消费者线程都执行到Exchanger. exchange(V)相当于这两个线程都到达了集合点,此时生产者线程和消费者线程各自对 Exchanger.exchange(V)的调用就会返回 。Exchanger.exchange(V)的返回值是对方线程执行该方法时所指定的参数 x 的值。