Java线程中使用BlockingQueue的生产者消费者解决方案
生产者-消费者问题 是一个同步问题,当一个或多个线程生成数据并将其放置在缓冲区中,并且同时一个或多个线程从同一缓冲区消耗数据时,就会出现同步问题。
这样做可能会导致线程相互竞争以完成其任务的竞争条件。在这种情况下,没有什么能阻止他们同时进入方法并产生错误的结果。
此外,由于缺乏线程间通信,即使缓冲区为空,消费者也可能会尝试删除元素。类似地,生产者可能会在缓冲区已满时尝试添加元素。
可能的解决方案:
- 在删除和添加之前检查缓冲区的大小似乎是一个解决方案。生产者-消费者系统通常使用无限 while 循环。检查循环每次迭代的大小将是低效的。此外,无法保证线程安全。因此不使用此解决方案。
- wait() 和 notify()方法可用于建立线程间通信。
- BlockingQueue是 wait() 和 notify() 的一种不太复杂的线程安全替代方案。本文将讨论此方法。
阻塞队列:
BlockingQueue 接口是Java.util.concurrent 包的一部分。
- 如果生产者线程试图将一个元素放入一个完整的 BlockingQueue,它会被阻塞并保持阻塞状态,直到消费者移除一个元素。
- 类似地,如果消费者线程试图从空的 BlockingQueue 中获取一个元素,它会被阻塞并保持阻塞状态,直到生产者添加一个元素。
BlockingQueue 有两个主要方法,即 put() 和 take()
放()
void put(E e) throws InterruptedException
e is the element to be added
InterruptedException is thrown if the thread is interrupted while waiting
拿()
E take() throws InterruptedException
returns head of the queue
InterruptedException is thrown if the thread is interrupted while waiting
BlockingQueue 也有add(E e)和remove()方法。但是这些方法不能用于生产者-消费者问题,因为:
- 如果队列已满,add 将抛出 IllegalStateException。
- remove 返回一个布尔值,但要返回一个元素。
BlockingQueue的实现
由于 BlockingQueue 是一个接口,我们不能创建它的对象。相反,我们可以创建实现 BlockingQueue 的类之一的对象。对于此演示,将使用 ArrayBlockingQueue。
数组阻塞队列
- 顾名思义,ArrayBlockingQueue 使用数组数据结构作为缓冲区。
- 由于它是一个数组,它的容量在声明后是固定的。
- 它提供了公平性作为一种选择。这意味着线程可以在先到先得的基础上访问缓冲区。公平性默认关闭。可以通过在构造函数中放置布尔值 true 来打开它。
生产-消费者解决方案
现在我们了解了 BlockingQueue 是什么及其用法。让我们应用这些知识来解决生产者-消费者问题。为了方便起见,我们可以通过为生产者和消费者创建一个单独的类来将这个问题分成两个子问题。生产者和消费者将由不同的线程操作,但将共享一个公共 BlockingQueue 缓冲区。
生产者:顾名思义,生产者类会产生数据。在我们的例子中,生产者类正在产生范围 [1,4] 中的数字。它将将此数据放置在 BlockingQueue 缓冲区中。
Java
// Java program to demonstrate producer code
// Implement Runnable since object
// of this class will be executed by
// a separate thread
class producer implements Runnable {
BlockingQueue obj;
public producer(BlockingQueue obj)
{
// accept an ArrayBlockingQueue object from
// constructor
this.obj = obj;
}
@Override public void run()
{
// Produce numbers in the range [1,4]
// and put them in the buffer
for (int i = 1; i <= 4; i++) {
try {
obj.put(i);
System.out.println("Produced " + i);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Java
// Java program to demonstrate consumer code
// Implement Runnable since object
// of this class will be executed by
// a separate thread
class consumer implements Runnable {
BlockingQueue obj;
// Initialize taken to -1
// to indicate that no number
// has been taken so far.
int taken = -1;
public consumer(BlockingQueue obj)
{
// accept an ArrayBlockingQueue object from
// constructor
this.obj = obj;
}
@Override public void run()
{
// Take numbers from the buffer and
// print them, if the last number taken
// is 4 then stop
while (taken != 4) {
try {
taken = obj.take();
System.out.println("Consumed " + taken);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Java
// Java Program to demonstrate producer consumer
// problem solution
// Import the BlockingQueue interface and
// ArrayBlockingQueue class
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
// Create a Main class for execution
public class Main {
public static void main(String[] args)
{
// Create an ArrayBlockingQueue object with capacity
// 4
BlockingQueue bqueue
= new ArrayBlockingQueue(4);
// Create 1 object each for producer
// and consumer and pass them the common
// buffer created above
producer p1 = new producer(bqueue);
consumer c1 = new consumer(bqueue);
// Create 1 thread each for producer
// and consumer and pass them their
// respective objects.
Thread pThread = new Thread(p1);
Thread cThread = new Thread(c1);
// Start both threads
pThread.start();
cThread.start();
}
}
class producer implements Runnable {
BlockingQueue obj;
public producer(BlockingQueue obj)
{
this.obj = obj;
}
@Override public void run()
{
for (int i = 1; i <= 4; i++) {
try {
obj.put(i);
System.out.println("Produced " + i);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class consumer implements Runnable {
BlockingQueue obj;
int taken = -1;
public consumer(BlockingQueue obj)
{
this.obj = obj;
}
@Override public void run()
{
while (taken != 4) {
try {
taken = obj.take();
System.out.println("Consumed " + taken);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
消费者:消费者将从 BlockingQueue 缓冲区中获取数据。在我们的例子中,这些数据将被简单地打印出来。
Java
// Java program to demonstrate consumer code
// Implement Runnable since object
// of this class will be executed by
// a separate thread
class consumer implements Runnable {
BlockingQueue obj;
// Initialize taken to -1
// to indicate that no number
// has been taken so far.
int taken = -1;
public consumer(BlockingQueue obj)
{
// accept an ArrayBlockingQueue object from
// constructor
this.obj = obj;
}
@Override public void run()
{
// Take numbers from the buffer and
// print them, if the last number taken
// is 4 then stop
while (taken != 4) {
try {
taken = obj.take();
System.out.println("Consumed " + taken);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
现在让我们创建一个 ArrayBlockingQueue 缓冲区对象,每个线程用于生产者和消费者,并执行解决方案。
生产者-消费者问题解决方案:
Java
// Java Program to demonstrate producer consumer
// problem solution
// Import the BlockingQueue interface and
// ArrayBlockingQueue class
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
// Create a Main class for execution
public class Main {
public static void main(String[] args)
{
// Create an ArrayBlockingQueue object with capacity
// 4
BlockingQueue bqueue
= new ArrayBlockingQueue(4);
// Create 1 object each for producer
// and consumer and pass them the common
// buffer created above
producer p1 = new producer(bqueue);
consumer c1 = new consumer(bqueue);
// Create 1 thread each for producer
// and consumer and pass them their
// respective objects.
Thread pThread = new Thread(p1);
Thread cThread = new Thread(c1);
// Start both threads
pThread.start();
cThread.start();
}
}
class producer implements Runnable {
BlockingQueue obj;
public producer(BlockingQueue obj)
{
this.obj = obj;
}
@Override public void run()
{
for (int i = 1; i <= 4; i++) {
try {
obj.put(i);
System.out.println("Produced " + i);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class consumer implements Runnable {
BlockingQueue obj;
int taken = -1;
public consumer(BlockingQueue obj)
{
this.obj = obj;
}
@Override public void run()
{
while (taken != 4) {
try {
taken = obj.take();
System.out.println("Consumed " + taken);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Produced 1
Produced 2
Produced 3
Produced 4
Consumed 1
Consumed 2
Consumed 3
Consumed 4
笔记:
- 上述程序可能会在每次运行时给出不同的生产和消费顺序。但值得注意的是,所有产生的数字都会被消耗掉,并且不会出现任何线程间通信问题。
- 毒元素:此元素表示生产-消费活动的结束,在上例中,4 是毒元素。
- 如果事先不知道元素的数量,可以使用LinkedBlockingQueue 。