如何解决一个生产者与消费者问题
生产者与消费者问题是多线程同步的一个经典问题。生产者和消费者同时使用一块缓冲区,生产者生产商品放入缓冲区,消费者从缓冲区中取出商品。我们需要保证的是,当缓冲区满时,生产者不可生产商品;当缓冲区为空时,消费者不可取出商品。
下面介绍java中几种解决同步问题的方式
(1)wait()与notify()方法
(2)Lock与Condition机制
(3)BlockingQueue阻塞队列
【1】wait()与notify()方法
这两个方法是object类中的方法
wait()用在以下场合:
(1)当缓冲区满时,缓冲区调用wait()方法,使得生产者释放锁,当前线程阻塞,其他线程可以获得锁。
(2)当缓冲区空时,缓冲区调用wait()方法,使得消费者释放锁,当前线程阻塞,其他线程可以获得锁。
notify()用在以下场合:
(1)当缓冲区未满时,生产者生产商品放入缓冲区,然后缓冲区调用notify()方法,通知上一个因wait()方法释放锁的线程现在可以去获得锁了,同步块代码执行完成后,释放对象锁,此处的对象锁,锁住的是缓冲区。
(2)当缓冲区不为空时,消费者从缓冲区中取出商品,然后缓冲区调用notify()方法,通知上一个因wait()方法释放锁的线程现在可以去获得锁了,同步块代码执行完成后,释放对象锁。
代码演示:
package day1101;
import java.util.LinkedList;
/**
* 生产者消费者问题
*/
public class ProAndCon {
//最大容量
public static final int MAX_SIZE = 2;
//存储媒介
public static LinkedList<Integer> list = new LinkedList<>();
class Producer implements Runnable {
@Override
public void run() {
synchronized (list) {
//仓库容量已经达到最大值
while (list.size() == MAX_SIZE) {
System.out.println("仓库已满,生产者" + Thread.currentThread().getName() + "不可生产.");
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(1);
System.out.println("生产者" + Thread.currentThread().getName() + "生产, 仓库容量为" + list.size());
list.notify();
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
synchronized (list) {
while (list.size() == 0) {
System.out.println("仓库为空,消费者" + Thread.currentThread().getName() + "不可消费.");
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.removeFirst();
System.out.println("消费者" + Thread.currentThread().getName() + "消费,仓库容量为" + list.size());
list.notify();
}
}
}
public static void main(String[] args) {
ProAndCon proAndCon = new ProAndCon();
Producer producer = proAndCon.new Producer();
Consumer consumer = proAndCon.new Consumer();
for (int i = 0; i < 10; i++) {
Thread pro = new Thread(producer);
pro.start();
Thread con = new Thread(consumer);
con.start();
}
}
}
运行结果:
【2】Lock与Condition机制
在JDK5.0之后,Java提供了Lock与Condition机制。Condition接口的await()和signal()是用来做同步的两种方法,它们的功能基本上和Object的wait()、nofity()相同,或者说可以取代它们,但是它们和Lock机制是直接挂钩的。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。
代码演示:
package day1101;
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProAndCon2 {
public static final int MAX_SIZE = 2;
public static LinkedList<Integer> list = new LinkedList<>();
public static Lock lock = new ReentrantLock();
//仓库满的条件变量
public static Condition full = lock.newCondition();
//仓库空的条件变量
public static Condition empty = lock.newCondition();
class Producer implements Runnable {
@Override
public void run() {
lock.lock();
while (list.size() == MAX_SIZE) {
try {
System.out.println("仓库已满,生产者" + Thread.currentThread().getName() + "不可生产.");
full.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(1);
System.out.println("生产者" + Thread.currentThread().getName() + "生产, 仓库容量为" + list.size());
//唤醒其他生产者与消费者线程
full.signal();
empty.signal();
lock.unlock();
}
}
class Consumer implements Runnable {
@Override
public void run() {
lock.lock();
while (list.size() == 0) {
try {
System.out.println("仓库为空,消费者" + Thread.currentThread().getName() + "不可消费.");
empty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.removeFirst();
System.out.println("消费者" + Thread.currentThread().getName() + "消费,仓库容量为" + list.size());
//唤醒其他生产者与消费者线程
full.signal();
empty.signal();
lock.unlock();
}
}
public static void main(String[] args) {
ProAndCon2 proAndCon = new ProAndCon2();
Producer producer = proAndCon.new Producer();
Consumer consumer = proAndCon.new Consumer();
for (int i = 0; i < 10; i++) {
Thread pro = new Thread(producer);
pro.start();
Thread con = new Thread(consumer);
con.start();
}
}
}
运行结果:
【3】使用BlockingQueue阻塞队列
什么是阻塞队列?
如果向一个已经满了的队列中添加元素或者从空队列中移除元素,都将会导致线程阻塞,线程一直等待到有旧元素被移除或新元素被添加的时候,才能继续执行。符合这种情况的队列,称为阻塞队列。
JDK 1.5 以后新增BlockingQueue接口,我们采用它实现类的其中两个类,ArrayBlockingQueue或者是LinkedBlockingQueue。
怎么使用LinkedBlockingQueue?
这里我们用LinkedBlockingQueue来解决生产者与消费者问题,主要用到它的两个方法,即put()与take()
put():向阻塞队列中添加一个元素,队列满时,自动阻塞。
take():从阻塞队列中取出一个元素,队列空时,自动阻塞。
其实LinkedBlockingQueue底层使用的仍然是Lock与Condition机制,我们从源码就可以看出来
//..............用到了Lock与Condition机制
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
//...........put方法
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary for space to become available.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
//...........take方法
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
看得出来,LinkedBlockingQueue底层已经解决好了同步问题,我们可以很方便的使用它。
代码演示:
package day1024;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 解决生产者与消费者问题
* 采用阻塞队列BlockingQueue
*/
public class ProAndCon3 {
public static final int MAX_SIZE = 2;
public static BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(MAX_SIZE);
class Producer implements Runnable {
@Override
public void run() {
if (queue.size() == MAX_SIZE) {
System.out.println("仓库已满,生产者" + Thread.currentThread().getName() + "不可生产.");
}
try {
queue.put(1);
System.out.println("生产者" + Thread.currentThread().getName() + "生产, 仓库容量为" + queue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
if (queue.size() == 0) {
System.out.println("仓库为空,消费者" + Thread.currentThread().getName() + "不可消费.");
}
try {
queue.take();
System.out.println("消费者" + Thread.currentThread().getName() + "消费,仓库容量为" + queue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ProAndCon3 proAndCon = new ProAndCon3();
Producer producer = proAndCon.new Producer();
Consumer consumer = proAndCon.new Consumer();
for (int i = 0; i < 10; i++) {
Thread pro = new Thread(producer);
pro.start();
Thread con = new Thread(consumer);
con.start();
}
}
}
运行结果就不贴了。