如何解决一个生产者与消费者问题

生产者与消费者问题是多线程同步的一个经典问题。生产者和消费者同时使用一块缓冲区,生产者生产商品放入缓冲区,消费者从缓冲区中取出商品。我们需要保证的是,当缓冲区满时,生产者不可生产商品;当缓冲区为空时,消费者不可取出商品。

下面介绍java中几种解决同步问题的方式

(1)wait()与notify()方法

(2)Lock与Condition机制

(3)BlockingQueue阻塞队列

 

【1】wait()与notify()方法

这两个方法是object类中的方法

wait()用在以下场合:

(1)当缓冲区满时,缓冲区调用wait()方法,使得生产者释放锁,当前线程阻塞,其他线程可以获得锁。

(2)当缓冲区空时,缓冲区调用wait()方法,使得消费者释放锁,当前线程阻塞,其他线程可以获得锁。

notify()用在以下场合:

(1)当缓冲区未满时,生产者生产商品放入缓冲区,然后缓冲区调用notify()方法,通知上一个因wait()方法释放锁的线程现在可以去获得锁了,同步块代码执行完成后,释放对象锁,此处的对象锁,锁住的是缓冲区。

(2)当缓冲区不为空时,消费者从缓冲区中取出商品,然后缓冲区调用notify()方法,通知上一个因wait()方法释放锁的线程现在可以去获得锁了,同步块代码执行完成后,释放对象锁。

代码演示:

package day1101;

import java.util.LinkedList;

/**
 * 生产者消费者问题
 */
public class ProAndCon {
    //最大容量
    public static final int MAX_SIZE = 2;
    //存储媒介
    public static LinkedList<Integer> list = new LinkedList<>();

    class Producer implements Runnable {
        @Override
        public void run() {
            synchronized (list) {
                //仓库容量已经达到最大值
                while (list.size() == MAX_SIZE) {
                    System.out.println("仓库已满,生产者" + Thread.currentThread().getName() + "不可生产.");
                    try {
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                list.add(1);
                System.out.println("生产者" + Thread.currentThread().getName() + "生产, 仓库容量为" + list.size());
                list.notify();
            }
        }
    }

    class Consumer implements Runnable {

        @Override
        public void run() {
            synchronized (list) {
                while (list.size() == 0) {
                    System.out.println("仓库为空,消费者" + Thread.currentThread().getName() + "不可消费.");
                    try {
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                list.removeFirst();
                System.out.println("消费者" + Thread.currentThread().getName() + "消费,仓库容量为" + list.size());
                list.notify();
            }
        }
    }


    public static void main(String[] args) {
        ProAndCon proAndCon = new ProAndCon();
        Producer producer = proAndCon.new Producer();
        Consumer consumer = proAndCon.new Consumer();

        for (int i = 0; i < 10; i++) {
            Thread pro = new Thread(producer);
            pro.start();
            Thread con = new Thread(consumer);
            con.start();
        }
    }

}

运行结果:


【2】Lock与Condition机制

在JDK5.0之后,Java提供了Lock与Condition机制。Condition接口的await()和signal()是用来做同步的两种方法,它们的功能基本上和Object的wait()、nofity()相同,或者说可以取代它们,但是它们和Lock机制是直接挂钩的。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。

代码演示:

package day1101;

import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProAndCon2 {
    public static final int MAX_SIZE = 2;
    public static LinkedList<Integer> list = new LinkedList<>();
    public static Lock lock = new ReentrantLock();
    //仓库满的条件变量
    public static Condition full = lock.newCondition();
    //仓库空的条件变量
    public static Condition empty = lock.newCondition();


    class Producer implements Runnable {

        @Override
        public void run() {
            lock.lock();
            while (list.size() == MAX_SIZE) {
                try {
                    System.out.println("仓库已满,生产者" + Thread.currentThread().getName() + "不可生产.");
                    full.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            list.add(1);
            System.out.println("生产者" + Thread.currentThread().getName() + "生产, 仓库容量为" + list.size());
            //唤醒其他生产者与消费者线程
            full.signal();
            empty.signal();
            lock.unlock();
        }
    }

    class Consumer implements Runnable {

        @Override
        public void run() {
            lock.lock();
            while (list.size() == 0) {
                try {
                    System.out.println("仓库为空,消费者" + Thread.currentThread().getName() + "不可消费.");
                    empty.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            list.removeFirst();
            System.out.println("消费者" + Thread.currentThread().getName() + "消费,仓库容量为" + list.size());
            //唤醒其他生产者与消费者线程
            full.signal();
            empty.signal();
            lock.unlock();
        }
    }


    public static void main(String[] args) {
        ProAndCon2 proAndCon = new ProAndCon2();
        Producer producer = proAndCon.new Producer();
        Consumer consumer = proAndCon.new Consumer();

        for (int i = 0; i < 10; i++) {
            Thread pro = new Thread(producer);
            pro.start();
            Thread con = new Thread(consumer);
            con.start();
        }
    }


}

运行结果:


【3】使用BlockingQueue阻塞队列

什么是阻塞队列?

如果向一个已经满了的队列中添加元素或者从空队列中移除元素,都将会导致线程阻塞,线程一直等待到有旧元素被移除或新元素被添加的时候,才能继续执行。符合这种情况的队列,称为阻塞队列。

JDK 1.5 以后新增BlockingQueue接口,我们采用它实现类的其中两个类,ArrayBlockingQueue或者是LinkedBlockingQueue。

怎么使用LinkedBlockingQueue?

这里我们用LinkedBlockingQueue来解决生产者与消费者问题,主要用到它的两个方法,即put()与take()

put():向阻塞队列中添加一个元素,队列满时,自动阻塞。

take():从阻塞队列中取出一个元素,队列空时,自动阻塞。

其实LinkedBlockingQueue底层使用的仍然是Lock与Condition机制,我们从源码就可以看出来

//..............用到了Lock与Condition机制

/** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();




//...........put方法



/**
     * Inserts the specified element at the tail of this queue, waiting if
     * necessary for space to become available.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }




//...........take方法



 public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

看得出来,LinkedBlockingQueue底层已经解决好了同步问题,我们可以很方便的使用它。

代码演示:

package day1024;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 解决生产者与消费者问题
 * 采用阻塞队列BlockingQueue
 */
public class ProAndCon3 {
    public static final int MAX_SIZE = 2;
    public static BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(MAX_SIZE);

    class Producer implements Runnable {
        @Override
        public void run() {
            if (queue.size() == MAX_SIZE) {
                System.out.println("仓库已满,生产者" + Thread.currentThread().getName() + "不可生产.");
            }
            try {
                queue.put(1);
                System.out.println("生产者" + Thread.currentThread().getName() + "生产, 仓库容量为" + queue.size());

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


    class Consumer implements Runnable {
        @Override
        public void run() {
            if (queue.size() == 0) {
                System.out.println("仓库为空,消费者" + Thread.currentThread().getName() + "不可消费.");
            }
            try {
                queue.take();
                System.out.println("消费者" + Thread.currentThread().getName() + "消费,仓库容量为" + queue.size());

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


    public static void main(String[] args) {
        ProAndCon3 proAndCon = new ProAndCon3();
        Producer producer = proAndCon.new Producer();
        Consumer consumer = proAndCon.new Consumer();

        for (int i = 0; i < 10; i++) {
            Thread pro = new Thread(producer);
            pro.start();
            Thread con = new Thread(consumer);
            con.start();
        }
        
    }
}

运行结果就不贴了。