MQ(Message Queue)
简介
MQ用于分布式系统之间通信,是个存储消息的容器。
优势与劣势
优势
- 系统解耦
- 使系统之间支持异步调用,提高了系统的吞吐量以及用户体验
- 削峰填谷,保证系统稳定运行,提高了系统的稳定性
劣势
- 使系统的可用性降低(依赖外部系统越多,可用性越低),需要保证MQ的高可用。
- 系统的复杂性变高,且需要保证消息的不被丢失。
RabbitMQ
由Rabbit公司使用Erlang语言开发的MQ产品。协议支持:AMQP, XMPP, SMTP, STOMP,其单机吞吐量可达万级,消息延迟微秒级。
优势:并发能力强,性能好,延时低, 管理界面丰富。
AMQP(Advanced Message Queuing Protocol)协议
高级消息队列协议,2006年发布,类比HTTP。为面向消息的中间件设计。
RabbitMQ中的角色
- Broker
即RabbitMQ Server - Virtual Host
为多用户提供服务时,可以为每个用户创建一个虚拟主机,可以起到数据隔离的效果。 - Connection
publisher/consumer与broker之间的TCP连接。 - Channel
由connection创建的虚拟连接,多线程时,会为每个线程创建一个channel。作为轻量级的connection实现,极大减少了系统为了创建connection时的开销。 - Exchange
交换机根据分发规则,匹配查询表中的Routing Key,将消息分发到队列中。交换机常见类型有:direct,topic以及fanout。 - Queue
消息存放的地方。 - Binding
包含Routing Key,保存到交换机的查询表中。用作消息的分发依据。
常见的5种工作模式
// 准备一个创建连接的工具类 public class RabbitMqUtil { public static Connection getConnection() { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("heoller"); connectionFactory.setPassword("heoller"); connectionFactory.setVirtualHost("study"); Connection connection = null; try { connection = connectionFactory.newConnection(); } catch (Exception e) { // todo logger } return connection; } }
简单模式(Hello World)
// 发布消息 public class Publisher { public static void main(String[] args) throws Exception { Connection connection = RabbitMqUtil.getConnection(); Channel channel = connection.createChannel(); /* * 队列声明,不存在则创建 * 参数一: 队列名称 * 参数二: 是否持久化 * 参数三: 是否独占,false表示所有消费者都可消费,true表示只有第一个消费者才可以消费 * 参数四: 是否自动删除,false表示连接停掉后,不删除队列 * 参数五: 额外属性 */ channel.queueDeclare("simple", false, false, false, null); String message = "heoller is a handsome guy"; /* * 生产消息 * 参数一:交换机 * 参数二:队列名称 * 参数三:额外属性 * 参数四:消息的字节数组 */ channel.basicPublish("", "simple", null, message.getBytes()); channel.close(); connection.close(); } } // 消费消息 public class Consumer { public static void main(String[] args) throws Exception { Connection connection = RabbitMqUtil.getConnection(); Channel channel = connection.createChannel(); // 队列声明,不存在则创建 channel.queueDeclare("simple", false, false, false, null); /* * 消费消息 * 参数一:队列名 * 参数二:是否自动确认 * 参数三:DefaultConsumer实现 */ channel.basicConsume("simple", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("消费的消息内容: " + message); System.out.println("消费的消息标签: " + envelope.getDeliveryTag()); // 参数二false表示只确认当前消费的消息 channel.basicAck(envelope.getDeliveryTag(), false); } }); } }
工作队列模式(Work Queue)
和简单模式相比,工作队列模式是由多个消费端同时消费同一个队列的消息。// 消息生产者 public class Publisher { public static void main(String[] args) throws Exception { Connection connection = RabbitMqUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("workqueue", false, false, false, null); String message = "My name is heoller"; // 发布100条消息到workqueue队列中 for (int i = 0; i < 100; i++) { channel.basicPublish("", "workqueue", null, message.getBytes()); } channel.close(); connection.close(); } } // 消息消费者A public class ConsumerA { public static void main(String[] args) throws Exception { Connection connection = RabbitMqUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("workqueue", false, false, false, null); /* * 若未设置channel.basicQos(1),MQ会将消息平均分给所有消费者,设置channel.basicQos(1)则表示消费完1条再去取1条 * 好处是,消费速度快的消费者会多消费,分担了消费慢的消费者的压力,总体加快了队列任务的处理时长。 */ // channel.basicQos(1); channel.basicConsume("workqueue", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("消费的消息内容: " + message); System.out.println("消费的消息标签: " + envelope.getDeliveryTag()); // 参数二false表示只确认当前消费的消息 channel.basicAck(envelope.getDeliveryTag(), false); } }); } } // 消息消费者B public class ConsumerB { public static void main(String[] args) throws Exception { Connection connection = RabbitMqUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("workqueue", false, false, false, null); channel.basicConsume("workqueue", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("消费的消息内容: " + message); System.out.println("消费的消息标签: " + envelope.getDeliveryTag()); // 参数二false表示只确认当前消费的消息 channel.basicAck(envelope.getDeliveryTag(), false); } }); } }
发布订阅模式(Pub/Sub)
// 发布者 public class Publisher { public static void main(String[] args) throws Exception { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); // 声明交换机, 类型 fanout channel.exchangeDeclare(RabbitMQConstant.EXCHANGE_NAME_PUB_SUB, "fanout"); // 往交换机上发送数据 channel.basicPublish(RabbitMQConstant.EXCHANGE_NAME_PUB_SUB, "", null, ("My name is heoller").getBytes()); channel.close(); connection.close(); } } // 消费者A public class ConsumerA { public static void main(String[] args) throws Exception { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RabbitMQConstant.QUEUE_NAME_PUB_SUB_A, false, false, false, null); // 声明交换机, 类型 fanout channel.exchangeDeclare(RabbitMQConstant.EXCHANGE_NAME_PUB_SUB, "fanout"); // 将交换机和队列声明绑定关系 channel.queueBind(RabbitMQConstant.QUEUE_NAME_PUB_SUB_A, RabbitMQConstant.EXCHANGE_NAME_PUB_SUB, ""); channel.basicQos(1); channel.basicConsume(RabbitMQConstant.QUEUE_NAME_PUB_SUB_A, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("消费了一个消息,消息内容:" + message); // 手动ACK channel.basicAck(envelope.getDeliveryTag(), false); } }); } } // 消费者B public class ConsumerB { public static void main(String[] args) throws Exception { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RabbitMQConstant.QUEUE_NAME_PUB_SUB_B, false, false, false, null); // 声明交换机, 类型 fanout channel.exchangeDeclare(RabbitMQConstant.EXCHANGE_NAME_PUB_SUB, "fanout"); // 将交换机和队列声明绑定关系 channel.queueBind(RabbitMQConstant.QUEUE_NAME_PUB_SUB_B, RabbitMQConstant.EXCHANGE_NAME_PUB_SUB, ""); channel.basicQos(1); channel.basicConsume(RabbitMQConstant.QUEUE_NAME_PUB_SUB_B, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("消费了一个消息,消息内容:" + message); // 手动ACK channel.basicAck(envelope.getDeliveryTag(), false); } }); } }
路由模式(Routing)
public class Publisher { public static void main(String[] args) throws Exception { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); // 声明交换机, 类型 direct channel.exchangeDeclare(RabbitMQConstant.EXCHANGE_NAME_ROUTING, "direct"); Map<String, String> dataList = new HashMap<>(); dataList.put("com.heoller.name", "His name is heoller"); dataList.put("com.heoller.age", "He is 18 years old"); dataList.put("com.heoller.position", "He work in suning on java"); dataList.forEach((routingKey, value) -> { // 往交换机上发送数据 try { channel.basicPublish(RabbitMQConstant.EXCHANGE_NAME_ROUTING, routingKey, null, value.getBytes()); } catch (IOException e) { e.printStackTrace(); } }); channel.close(); connection.close(); System.out.println("生产一条消息"); } } // 消费者A public class ConsumerA { public static void main(String[] args) throws Exception { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RabbitMQConstant.QUEUE_NAME_ROUTING_A, false, false, false, null); // 声明交换机, 类型 direct channel.exchangeDeclare(RabbitMQConstant.EXCHANGE_NAME_ROUTING, "direct"); // 将交换机和队列声明绑定关系 channel.queueBind(RabbitMQConstant.QUEUE_NAME_ROUTING_A, RabbitMQConstant.EXCHANGE_NAME_ROUTING, "com.heoller.name"); channel.queueBind(RabbitMQConstant.QUEUE_NAME_ROUTING_A, RabbitMQConstant.EXCHANGE_NAME_ROUTING, "com.heoller.age"); channel.basicQos(1); channel.basicConsume(RabbitMQConstant.QUEUE_NAME_ROUTING_A, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("消费了一个消息,消息内容:" + message); // 手动ACK channel.basicAck(envelope.getDeliveryTag(), false); } }); } } // 消费者B public class ConsumerB { public static void main(String[] args) throws Exception { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RabbitMQConstant.QUEUE_NAME_ROUTING_B, false, false, false, null); // 声明交换机, 类型 direct channel.exchangeDeclare(RabbitMQConstant.EXCHANGE_NAME_ROUTING, "direct"); // 将交换机和队列声明绑定关系 channel.queueBind(RabbitMQConstant.QUEUE_NAME_ROUTING_B, RabbitMQConstant.EXCHANGE_NAME_ROUTING, "com.heoller.name"); channel.queueBind(RabbitMQConstant.QUEUE_NAME_ROUTING_B, RabbitMQConstant.EXCHANGE_NAME_ROUTING, "com.heoller.position"); channel.basicQos(1); channel.basicConsume(RabbitMQConstant.QUEUE_NAME_ROUTING_B, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("消费了一个消息,消息内容:" + message); // 手动ACK channel.basicAck(envelope.getDeliveryTag(), false); } }); } }
主题模式(Topics)
// 发布消息 public class Publisher { public static void main(String[] args) throws Exception { Connection connection = RabbitMqUtil.getConnection(); Channel channel = connection.createChannel(); String exchangeName = "exchange-topic"; // 声明一个交换机,类型 topic channel.exchangeDeclare(exchangeName, "topic"); // 消息列表 Map<String, String> dataList = new HashMap<>(); dataList.put("com.heoller", "com.heoller"); dataList.put("com.heoller.name", "com.heoller.name"); dataList.put("com.heoller.age", "com.heoller.age"); dataList.put("cn.heoller.age", "cn.heoller.age"); // 生产消息 dataList.forEach((routingKey, message) -> { try { channel.basicPublish(exchangeName, routingKey, null, message.getBytes()); } catch (IOException e) { // todo logger } }); channel.close(); connection.close(); } } // 消费消息 public class ConsumerA { public static void main(String[] args) throws Exception { Connection connection = RabbitMqUtil.getConnection(); Channel channel = connection.createChannel(); String queueName = "topic-queue-a"; // 声明队列 channel.queueDeclare(queueName, false, false, false, null); String exchangeName = "exchange-topic"; // 声明一个交换机,类型 topic channel.exchangeDeclare(exchangeName, "topic"); // 根据routingKey 绑定队列到交换机 // *号匹配单层,#号匹配0层及以上 channel.queueBind(queueName, exchangeName, "*.heoller.#"); channel.basicQos(1); channel.basicConsume(queueName, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("消费的消息内容: " + message); // 参数二false表示只确认当前消费的消息 channel.basicAck(envelope.getDeliveryTag(), false); } }); } } // 消费消息 public class ConsumerB { public static void main(String[] args) throws Exception { Connection connection = RabbitMqUtil.getConnection(); Channel channel = connection.createChannel(); String queueName = "topic-queue-b"; // 声明队列 channel.queueDeclare(queueName, false, false, false, null); String exchangeName = "exchange-topic"; // 声明一个交换机,类型 topic channel.exchangeDeclare(exchangeName, "topic"); // 根据routingKey 绑定队列到交换机 channel.queueBind(queueName, exchangeName, "cn.#"); channel.basicQos(1); channel.basicConsume(queueName, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("消费的消息内容: " + message); // 参数二false表示只确认当前消费的消息 channel.basicAck(envelope.getDeliveryTag(), false); } }); } }
消息确认机制
RabbitMQ为了确认消息是否被成功的投递到Broker中,RabbitMQ使用监听器(Listener)机制确认消息投递的状态。
消息投递的两种状态-Confirm & Return
- Confirm
表示生产者将消息投递到Broker时的状态,会有两种状态,ack(broker成功接收)和nack(broker因某种原因拒收了消息) - Return
表示消息被broker正常ack后,但发现没有可投递的队列时所表现的状态,此时消息也会被退回给生产者。
以上两种状态是发生在生产者和Broker之间的,不涉及消费者。
// 发布消息 public class Publisher { public static void main(String[] args) throws Exception { Connection connection = RabbitMqUtil.getConnection(); Channel channel = connection.createChannel(); String exchangeName = "exchange-topic"; // 声明一个交换机,类型 topic channel.exchangeDeclare(exchangeName, "topic"); // 开启监听模式 channel.confirmSelect(); // 添加监听器 channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("消息已被Broker接收"); } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("消息已被Broker拒收"); } }); // add return callback channel.addReturnListener(new ReturnCallback() { @Override public void handle(Return returnMessage) { System.out.println("消息被退回"); } }); // 消息列表 Map<String, String> dataList = new HashMap<>(); dataList.put("com.heoller", "com.heoller"); // 消费者A匹配到com.heoller, 所以该条消息会被return给生产者 dataList.put("cn.heoller", "cn.heoller"); // 生产消息 dataList.forEach((routingKey, message) -> { try { // 第三个参数mandatory设置为true表示消息无法投递到队列时,会回调生产者的returnCallBack。设置为false,则直接丢弃消息。 channel.basicPublish(exchangeName, routingKey, true, null, message.getBytes()); } catch (Exception e) { e.printStackTrace(); } }); } } public class ConsumerA { public static void main(String[] args) throws Exception { Connection connection = RabbitMqUtil.getConnection(); Channel channel = connection.createChannel(); String queueName = "topic-queue-a"; // 声明队列 channel.queueDeclare(queueName, false, false, false, null); String exchangeName = "exchange-topic"; channel.exchangeDeclare(exchangeName, "topic"); channel.queueBind(queueName, exchangeName, "com.*"); channel.basicQos(1); channel.basicConsume(queueName, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("消费的消息内容: " + message); // 参数二false表示只确认当前消费的消息 channel.basicAck(envelope.getDeliveryTag(), false); } }); } }