分析

  • 多个线程负责生产消息,多个线程负责发送消息
  • 生产和发送消息都需要加锁
  • 如果没有消息需要发送,则进入线程等待

编码

需要注意

  • <mark>在线程调用一些方法时,不要使用try…catch…,而是将异常抛出,在线程处理逻辑统一处理。否则,会造成线程无法关闭等异常操作</mark>
public class MessageQueue {

    private final LinkedList<Message> messages;

    private final static int DEFAULT_MAX_LIMIT = 100;//最大的消息数

    private final int limit;

    public MessageQueue() {
        this(DEFAULT_MAX_LIMIT);
    }

    public MessageQueue(final int limit) {
        this.limit = limit;
        this.messages = new LinkedList<>();
    }

    /** * 放消息 * * @param message */
    public void put(final Message message) throws InterruptedException {
        synchronized (messages) {
            while (messages.size() >= limit) {
                messages.wait();
            }
            messages.addLast(message);
            messages.notifyAll();
        }
    }

    /** * 取出消息 * * @return */
    public Message take() throws InterruptedException {
        synchronized (messages) {
            while (messages.size() <= 0) {
                messages.wait();
            }
            Message message = messages.remove(0);
            messages.notifyAll();
            return message;
        }
    }


    /** * 返回限制的最大消息数 * * @return */
    public int getMaxLimit() {
        return limit;
    }

    /** * 返回消息数 * * @return */
    public int getMessagesSize() {
        synchronized (messages) {
            return messages.size();
        }
    }

}
public class ConsumerThread extends Thread {

    private final MessageQueue messageQueue;

    private final static AtomicInteger counter = new AtomicInteger(0);

    private final static Random random = new Random(System.currentTimeMillis());

    public ConsumerThread(MessageQueue messageQueue, int seq) {
        super("Consumer-" + seq);
        this.messageQueue = messageQueue;
    }


    @Override
    public void run() {
        while (!this.isInterrupted()) {
            try {
                Message message = messageQueue.take();
                System.out.println(Thread.currentThread().getName() + "get Message" + message.getData());
                Thread.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + " 关闭");
                break;
            }
        }

    }
}
public class ProducerThread extends Thread {
    private final MessageQueue messageQueue;

    private final static AtomicInteger counter = new AtomicInteger(0);

    private final  static Random random = new Random(System.currentTimeMillis()) ;

    public ProducerThread(MessageQueue messageQueue,int seq) {
        super("Producer-" + seq);
        this.messageQueue = messageQueue;
    }


    @Override
    public void run() {
        while (true){
            Message message = new Message("message" + counter.getAndIncrement());
              try {
                  messageQueue.put(message);
                  System.out.println(Thread.currentThread().getName()+"put Message" + message.getData());
                  Thread.sleep(random.nextInt(1000));
                if (this.isInterrupted()){
                    throw new InterruptedException();
                }
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + "停止");
                break;
            }
        }

    }
}
public class Test {

    public static void main(String[] args) throws InterruptedException {
        final MessageQueue messageQueue = new MessageQueue(5);

        new ProducerThread(messageQueue,1).start();
// new ProducerThread(messageQueue,2).start();
// new ProducerThread(messageQueue,3).start();
// new ProducerThread(messageQueue,4).start();


        new ConsumerThread(messageQueue,1).start();
        new ConsumerThread(messageQueue,2).start();
        new ConsumerThread(messageQueue,3).start();
        new ConsumerThread(messageQueue,4).start();
        new ConsumerThread(messageQueue,5).start();



        Thread.sleep(5000);

        Thread.currentThread().getThreadGroup().interrupt();
    }
}