MQ(Message Queue)

简介

MQ用于分布式系统之间通信,是个存储消息的容器。
图片说明

优势与劣势

优势
  • 系统解耦
  • 使系统之间支持异步调用,提高了系统的吞吐量以及用户体验
  • 削峰填谷,保证系统稳定运行,提高了系统的稳定性
劣势
  • 使系统的可用性降低(依赖外部系统越多,可用性越低),需要保证MQ的高可用。
  • 系统的复杂性变高,且需要保证消息的不被丢失。

RabbitMQ

由Rabbit公司使用Erlang语言开发的MQ产品。协议支持:AMQP, XMPP, SMTP, STOMP,其单机吞吐量可达万级,消息延迟微秒级。
优势:并发能力强,性能好,延时低, 管理界面丰富。

AMQP(Advanced Message Queuing Protocol)协议

高级消息队列协议,2006年发布,类比HTTP。为面向消息的中间件设计。

RabbitMQ中的角色

  • Broker
    即RabbitMQ Server
  • Virtual Host
    为多用户提供服务时,可以为每个用户创建一个虚拟主机,可以起到数据隔离的效果。
  • Connection
    publisher/consumer与broker之间的TCP连接。
  • Channel
    由connection创建的虚拟连接,多线程时,会为每个线程创建一个channel。作为轻量级的connection实现,极大减少了系统为了创建connection时的开销。
  • Exchange
    交换机根据分发规则,匹配查询表中的Routing Key,将消息分发到队列中。交换机常见类型有:direct,topic以及fanout。
  • Queue
    消息存放的地方。
  • Binding
    包含Routing Key,保存到交换机的查询表中。用作消息的分发依据。

常见的5种工作模式

// 准备一个创建连接的工具类
public class RabbitMqUtil {

    public static Connection getConnection() {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("heoller");
        connectionFactory.setPassword("heoller");
        connectionFactory.setVirtualHost("study");

        Connection connection = null;
        try {
            connection = connectionFactory.newConnection();
        } catch (Exception e) {
            // todo logger
        }
        return connection;
    }

}
  1. 简单模式(Hello World)
    simple

    // 发布消息
    public class Publisher {
     public static void main(String[] args) throws Exception {
         Connection connection = RabbitMqUtil.getConnection();
         Channel channel = connection.createChannel();
         /*
          * 队列声明,不存在则创建
          * 参数一: 队列名称
          * 参数二: 是否持久化
          * 参数三: 是否独占,false表示所有消费者都可消费,true表示只有第一个消费者才可以消费
          * 参数四: 是否自动删除,false表示连接停掉后,不删除队列
          * 参数五: 额外属性
          */
         channel.queueDeclare("simple", false, false, false, null);
         String message = "heoller is a handsome guy";
         /*
          * 生产消息
          * 参数一:交换机
          * 参数二:队列名称
          * 参数三:额外属性
          * 参数四:消息的字节数组
          */
         channel.basicPublish("", "simple", null, message.getBytes());
         channel.close();
         connection.close();
     }
    }
    // 消费消息
    public class Consumer {
     public static void main(String[] args) throws Exception {
         Connection connection = RabbitMqUtil.getConnection();
         Channel channel = connection.createChannel();
         // 队列声明,不存在则创建
         channel.queueDeclare("simple", false, false, false, null);
         /*
          * 消费消息
          * 参数一:队列名
          * 参数二:是否自动确认
          * 参数三:DefaultConsumer实现
          */
         channel.basicConsume("simple", false, new DefaultConsumer(channel) {
             @Override
             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
                 String message = new String(body);
                 System.out.println("消费的消息内容: " + message);
                 System.out.println("消费的消息标签: " + envelope.getDeliveryTag());
                 // 参数二false表示只确认当前消费的消息
                 channel.basicAck(envelope.getDeliveryTag(), false);
             }
         });
     }
    }
  2. 工作队列模式(Work Queue)
    work queue
    和简单模式相比,工作队列模式是由多个消费端同时消费同一个队列的消息。

    // 消息生产者
    public class Publisher {
     public static void main(String[] args) throws Exception {
         Connection connection = RabbitMqUtil.getConnection();
         Channel channel = connection.createChannel();
         channel.queueDeclare("workqueue", false, false, false, null);
         String message = "My name is heoller";
         // 发布100条消息到workqueue队列中
         for (int i = 0; i < 100; i++) {
             channel.basicPublish("", "workqueue", null, message.getBytes());   
         }
         channel.close();
         connection.close();
     }
    }
    // 消息消费者A
    public class ConsumerA {
     public static void main(String[] args) throws Exception {
         Connection connection = RabbitMqUtil.getConnection();
         Channel channel = connection.createChannel();
         channel.queueDeclare("workqueue", false, false, false, null);
         /*
          * 若未设置channel.basicQos(1),MQ会将消息平均分给所有消费者,设置channel.basicQos(1)则表示消费完1条再去取1条
          * 好处是,消费速度快的消费者会多消费,分担了消费慢的消费者的压力,总体加快了队列任务的处理时长。
          */
         // channel.basicQos(1);
         channel.basicConsume("workqueue", false, new DefaultConsumer(channel) {
             @Override
             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
                 String message = new String(body);
                 System.out.println("消费的消息内容: " + message);
                 System.out.println("消费的消息标签: " + envelope.getDeliveryTag());
                 // 参数二false表示只确认当前消费的消息
                 channel.basicAck(envelope.getDeliveryTag(), false);
             }
         });
     }
    }
    // 消息消费者B
    public class ConsumerB {
     public static void main(String[] args) throws Exception {
         Connection connection = RabbitMqUtil.getConnection();
         Channel channel = connection.createChannel();
         channel.queueDeclare("workqueue", false, false, false, null);
         channel.basicConsume("workqueue", false, new DefaultConsumer(channel) {
             @Override
             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
                 String message = new String(body);
                 System.out.println("消费的消息内容: " + message);
                 System.out.println("消费的消息标签: " + envelope.getDeliveryTag());
                 // 参数二false表示只确认当前消费的消息
                 channel.basicAck(envelope.getDeliveryTag(), false);
             }
         });
     }
    }
  3. 发布订阅模式(Pub/Sub)

    // 发布者
    public class Publisher {
     public static void main(String[] args) throws Exception {
         Connection connection = RabbitMQUtil.getConnection();
         Channel channel = connection.createChannel();
         // 声明交换机, 类型 fanout
         channel.exchangeDeclare(RabbitMQConstant.EXCHANGE_NAME_PUB_SUB, "fanout");
         // 往交换机上发送数据
         channel.basicPublish(RabbitMQConstant.EXCHANGE_NAME_PUB_SUB, "", null, ("My name is heoller").getBytes());
         channel.close();
         connection.close();
     }
    }
    // 消费者A
    public class ConsumerA {
     public static void main(String[] args) throws Exception {
         Connection connection = RabbitMQUtil.getConnection();
         Channel channel = connection.createChannel();
         channel.queueDeclare(RabbitMQConstant.QUEUE_NAME_PUB_SUB_A, false, false, false, null);
         // 声明交换机, 类型 fanout
         channel.exchangeDeclare(RabbitMQConstant.EXCHANGE_NAME_PUB_SUB, "fanout");
         // 将交换机和队列声明绑定关系
         channel.queueBind(RabbitMQConstant.QUEUE_NAME_PUB_SUB_A, RabbitMQConstant.EXCHANGE_NAME_PUB_SUB, "");
         channel.basicQos(1);
         channel.basicConsume(RabbitMQConstant.QUEUE_NAME_PUB_SUB_A, false, new DefaultConsumer(channel) {
             @Override
             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                 String message = new String(body);
                 System.out.println("消费了一个消息,消息内容:" + message);
                 // 手动ACK
                 channel.basicAck(envelope.getDeliveryTag(), false);
             }
         });
     }
    }
    // 消费者B
    public class ConsumerB {
     public static void main(String[] args) throws Exception {
         Connection connection = RabbitMQUtil.getConnection();
         Channel channel = connection.createChannel();
         channel.queueDeclare(RabbitMQConstant.QUEUE_NAME_PUB_SUB_B, false, false, false, null);
         // 声明交换机, 类型 fanout
         channel.exchangeDeclare(RabbitMQConstant.EXCHANGE_NAME_PUB_SUB, "fanout");
         // 将交换机和队列声明绑定关系
         channel.queueBind(RabbitMQConstant.QUEUE_NAME_PUB_SUB_B, RabbitMQConstant.EXCHANGE_NAME_PUB_SUB, "");
         channel.basicQos(1);
         channel.basicConsume(RabbitMQConstant.QUEUE_NAME_PUB_SUB_B, false, new DefaultConsumer(channel) {
             @Override
             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                 String message = new String(body);
                 System.out.println("消费了一个消息,消息内容:" + message);
                 // 手动ACK
                 channel.basicAck(envelope.getDeliveryTag(), false);
             }
         });
     }
    }
  4. 路由模式(Routing)

    public class Publisher {
    
     public static void main(String[] args) throws Exception {
         Connection connection = RabbitMQUtil.getConnection();
         Channel channel = connection.createChannel();
         // 声明交换机, 类型 direct
         channel.exchangeDeclare(RabbitMQConstant.EXCHANGE_NAME_ROUTING, "direct");
    
         Map<String, String> dataList = new HashMap<>();
         dataList.put("com.heoller.name", "His name is heoller");
         dataList.put("com.heoller.age", "He is 18 years old");
         dataList.put("com.heoller.position", "He work in suning on java");
    
         dataList.forEach((routingKey, value) -> {
             // 往交换机上发送数据
             try {
                 channel.basicPublish(RabbitMQConstant.EXCHANGE_NAME_ROUTING, routingKey, null, value.getBytes());
             } catch (IOException e) {
                 e.printStackTrace();
             }
         });
    
         channel.close();
         connection.close();
         System.out.println("生产一条消息");
     }
    }
    // 消费者A
    public class ConsumerA {
    
     public static void main(String[] args) throws Exception {
         Connection connection = RabbitMQUtil.getConnection();
         Channel channel = connection.createChannel();
         channel.queueDeclare(RabbitMQConstant.QUEUE_NAME_ROUTING_A, false, false, false, null);
         // 声明交换机, 类型 direct
         channel.exchangeDeclare(RabbitMQConstant.EXCHANGE_NAME_ROUTING, "direct");
         // 将交换机和队列声明绑定关系
         channel.queueBind(RabbitMQConstant.QUEUE_NAME_ROUTING_A, RabbitMQConstant.EXCHANGE_NAME_ROUTING, "com.heoller.name");
         channel.queueBind(RabbitMQConstant.QUEUE_NAME_ROUTING_A, RabbitMQConstant.EXCHANGE_NAME_ROUTING, "com.heoller.age");
    
         channel.basicQos(1);
         channel.basicConsume(RabbitMQConstant.QUEUE_NAME_ROUTING_A, false, new DefaultConsumer(channel) {
             @Override
             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                 String message = new String(body);
                 System.out.println("消费了一个消息,消息内容:" + message);
                 // 手动ACK
                 channel.basicAck(envelope.getDeliveryTag(), false);
             }
         });
     }
    }
    // 消费者B
    public class ConsumerB {
    
     public static void main(String[] args) throws Exception {
         Connection connection = RabbitMQUtil.getConnection();
         Channel channel = connection.createChannel();
         channel.queueDeclare(RabbitMQConstant.QUEUE_NAME_ROUTING_B, false, false, false, null);
         // 声明交换机, 类型 direct
         channel.exchangeDeclare(RabbitMQConstant.EXCHANGE_NAME_ROUTING, "direct");
         // 将交换机和队列声明绑定关系
         channel.queueBind(RabbitMQConstant.QUEUE_NAME_ROUTING_B, RabbitMQConstant.EXCHANGE_NAME_ROUTING, "com.heoller.name");
         channel.queueBind(RabbitMQConstant.QUEUE_NAME_ROUTING_B, RabbitMQConstant.EXCHANGE_NAME_ROUTING, "com.heoller.position");
    
         channel.basicQos(1);
         channel.basicConsume(RabbitMQConstant.QUEUE_NAME_ROUTING_B, false, new DefaultConsumer(channel) {
             @Override
             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                 String message = new String(body);
                 System.out.println("消费了一个消息,消息内容:" + message);
                 // 手动ACK
                 channel.basicAck(envelope.getDeliveryTag(), false);
             }
         });
     }
    }
  5. 主题模式(Topics)

    // 发布消息
    public class Publisher {
    
     public static void main(String[] args) throws Exception {
         Connection connection = RabbitMqUtil.getConnection();
         Channel channel = connection.createChannel();
         String exchangeName = "exchange-topic";
         // 声明一个交换机,类型 topic
         channel.exchangeDeclare(exchangeName, "topic");
         // 消息列表
         Map<String, String> dataList = new HashMap<>();
         dataList.put("com.heoller", "com.heoller");
         dataList.put("com.heoller.name", "com.heoller.name");
         dataList.put("com.heoller.age", "com.heoller.age");
         dataList.put("cn.heoller.age", "cn.heoller.age");
         // 生产消息
         dataList.forEach((routingKey, message) -> {
             try {
                 channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
             } catch (IOException e) {
                 // todo logger
             }
         });
         channel.close();
         connection.close();
     }
    }
    // 消费消息
    public class ConsumerA {
    
     public static void main(String[] args) throws Exception {
         Connection connection = RabbitMqUtil.getConnection();
         Channel channel = connection.createChannel();
         String queueName = "topic-queue-a";
         // 声明队列
         channel.queueDeclare(queueName, false, false, false, null);
         String exchangeName = "exchange-topic";
         // 声明一个交换机,类型 topic
         channel.exchangeDeclare(exchangeName, "topic");
         // 根据routingKey 绑定队列到交换机
         // *号匹配单层,#号匹配0层及以上
         channel.queueBind(queueName, exchangeName, "*.heoller.#");
         channel.basicQos(1);
         channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
             @Override
             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
                 String message = new String(body);
                 System.out.println("消费的消息内容: " + message);
                 // 参数二false表示只确认当前消费的消息
                 channel.basicAck(envelope.getDeliveryTag(), false);
             }
         });
     }
    }
    // 消费消息
    public class ConsumerB {
    
     public static void main(String[] args) throws Exception {
         Connection connection = RabbitMqUtil.getConnection();
         Channel channel = connection.createChannel();
         String queueName = "topic-queue-b";
         // 声明队列
         channel.queueDeclare(queueName, false, false, false, null);
         String exchangeName = "exchange-topic";
         // 声明一个交换机,类型 topic
         channel.exchangeDeclare(exchangeName, "topic");
         // 根据routingKey 绑定队列到交换机
         channel.queueBind(queueName, exchangeName, "cn.#");
         channel.basicQos(1);
         channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
             @Override
             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
                 String message = new String(body);
                 System.out.println("消费的消息内容: " + message);
                 // 参数二false表示只确认当前消费的消息
                 channel.basicAck(envelope.getDeliveryTag(), false);
             }
         });
     }
    }

消息确认机制

RabbitMQ为了确认消息是否被成功的投递到Broker中,RabbitMQ使用监听器(Listener)机制确认消息投递的状态。

消息投递的两种状态-Confirm & Return
  • Confirm
    表示生产者将消息投递到Broker时的状态,会有两种状态,ack(broker成功接收)和nack(broker因某种原因拒收了消息)
  • Return
    表示消息被broker正常ack后,但发现没有可投递的队列时所表现的状态,此时消息也会被退回给生产者。

以上两种状态是发生在生产者和Broker之间的,不涉及消费者。

// 发布消息
public class Publisher {

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        String exchangeName = "exchange-topic";
        // 声明一个交换机,类型 topic
        channel.exchangeDeclare(exchangeName, "topic");
        // 开启监听模式
        channel.confirmSelect();
        // 添加监听器
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息已被Broker接收");
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息已被Broker拒收");
            }
        });
        // add return callback
        channel.addReturnListener(new ReturnCallback() {
            @Override
            public void handle(Return returnMessage) {
                System.out.println("消息被退回");
            }
        });

        // 消息列表
        Map<String, String> dataList = new HashMap<>();
        dataList.put("com.heoller", "com.heoller");
        // 消费者A匹配到com.heoller, 所以该条消息会被return给生产者
        dataList.put("cn.heoller", "cn.heoller");
        // 生产消息
        dataList.forEach((routingKey, message) -> {
            try {
                // 第三个参数mandatory设置为true表示消息无法投递到队列时,会回调生产者的returnCallBack。设置为false,则直接丢弃消息。
                channel.basicPublish(exchangeName, routingKey, true, null, message.getBytes());
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }
}
public class ConsumerA {

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        String queueName = "topic-queue-a";
        // 声明队列
        channel.queueDeclare(queueName, false, false, false, null);
        String exchangeName = "exchange-topic";
        channel.exchangeDeclare(exchangeName, "topic");
        channel.queueBind(queueName, exchangeName, "com.*");
        channel.basicQos(1);
        channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body);
                System.out.println("消费的消息内容: " + message);
                // 参数二false表示只确认当前消费的消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}