分析
- 多个线程负责生产消息,多个线程负责发送消息
- 生产和发送消息都需要加锁
- 如果没有消息需要发送,则进入线程等待
编码
需要注意:
- <mark>在线程调用一些方法时,不要使用try…catch…,而是将异常抛出,在线程处理逻辑统一处理。否则,会造成线程无法关闭等异常操作</mark>
public class MessageQueue {
private final LinkedList<Message> messages;
private final static int DEFAULT_MAX_LIMIT = 100;//最大的消息数
private final int limit;
public MessageQueue() {
this(DEFAULT_MAX_LIMIT);
}
public MessageQueue(final int limit) {
this.limit = limit;
this.messages = new LinkedList<>();
}
/** * 放消息 * * @param message */
public void put(final Message message) throws InterruptedException {
synchronized (messages) {
while (messages.size() >= limit) {
messages.wait();
}
messages.addLast(message);
messages.notifyAll();
}
}
/** * 取出消息 * * @return */
public Message take() throws InterruptedException {
synchronized (messages) {
while (messages.size() <= 0) {
messages.wait();
}
Message message = messages.remove(0);
messages.notifyAll();
return message;
}
}
/** * 返回限制的最大消息数 * * @return */
public int getMaxLimit() {
return limit;
}
/** * 返回消息数 * * @return */
public int getMessagesSize() {
synchronized (messages) {
return messages.size();
}
}
}
public class ConsumerThread extends Thread {
private final MessageQueue messageQueue;
private final static AtomicInteger counter = new AtomicInteger(0);
private final static Random random = new Random(System.currentTimeMillis());
public ConsumerThread(MessageQueue messageQueue, int seq) {
super("Consumer-" + seq);
this.messageQueue = messageQueue;
}
@Override
public void run() {
while (!this.isInterrupted()) {
try {
Message message = messageQueue.take();
System.out.println(Thread.currentThread().getName() + "get Message" + message.getData());
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " 关闭");
break;
}
}
}
}
public class ProducerThread extends Thread {
private final MessageQueue messageQueue;
private final static AtomicInteger counter = new AtomicInteger(0);
private final static Random random = new Random(System.currentTimeMillis()) ;
public ProducerThread(MessageQueue messageQueue,int seq) {
super("Producer-" + seq);
this.messageQueue = messageQueue;
}
@Override
public void run() {
while (true){
Message message = new Message("message" + counter.getAndIncrement());
try {
messageQueue.put(message);
System.out.println(Thread.currentThread().getName()+"put Message" + message.getData());
Thread.sleep(random.nextInt(1000));
if (this.isInterrupted()){
throw new InterruptedException();
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "停止");
break;
}
}
}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
final MessageQueue messageQueue = new MessageQueue(5);
new ProducerThread(messageQueue,1).start();
// new ProducerThread(messageQueue,2).start();
// new ProducerThread(messageQueue,3).start();
// new ProducerThread(messageQueue,4).start();
new ConsumerThread(messageQueue,1).start();
new ConsumerThread(messageQueue,2).start();
new ConsumerThread(messageQueue,3).start();
new ConsumerThread(messageQueue,4).start();
new ConsumerThread(messageQueue,5).start();
Thread.sleep(5000);
Thread.currentThread().getThreadGroup().interrupt();
}
}