1 消息队列概述

1.1 消息队列MQ

MQ 全称为 Message Queue,消息队列是应用程序和应用程序之间的通信方法。

  • 为什么使用 MQ

在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。

  • 消息队列应用场景

应用解耦、异步处理、流量削峰、日志处理、纯粹通信

1.2 AMQP和JMS

MQ 是消息通信的模型;实现 MQ 的大致有两种主流方式:AMQP、JMS。

  • AMQP

AMQP 高级消息队列协议,是一个进程间传递异步消息的网络协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和 JMS 的本质差别,AMQP 不从 API 层进行限定,而是直接定义网络交换的数据格式。

  • JMS

JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

  • AMQP和JMS区别

JMS 是定义了统一的接口,来对消息操作进行统一;AMQP 是通过规定协议来统一数据交互的格式JMS 限定了必须使用 Java 语言;AMQP 只是协议,不规定实现方式,因此是跨语言的。JMS 规定了两种消息模式(B2B、发布订阅);而 AMQP 的消息模式更加丰富

1.3 消息队列产品

  • kafka

Apache 下的一个子项目,使用 scala 实现的一个高性能分布式 Publish/Subscribe 消息队列系统。

1.快速持久化:通过磁盘顺序读写与零拷贝机制,可以在 O(1)的系统开销下进行消息持久化;

2.高吞吐:在一台普通的服务器上既可以达到 10W/s 的吞吐速率;

3.高堆积:支持 topic 下消费者较长时间离线,消息堆积量大;

4.完全的分布式系统:Broker、Producer、Consumer 都原生自动支持分布式,依赖zookeeper 自动实现负载均衡;

5.支持 Hadoop 数据并行加载:对于像 Hadoop 的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。

  • RocketMQ

RocketMQ 的前身是 Metaq,当 Metaq3.0 发布时,产品名称改为 RocketMQ。RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点 :

1.能够保证严格的消息顺序

2.提供丰富的消息拉取模式

3.高效的订阅者水平扩展能力

4.实时的消息订阅机制

5.支持事务消息

6.亿级消息堆积能力

  • RabbitMQ

使用 Erlang 编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP,STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。同时实现了 Broker 架构,核心思想是生产者不会将消息直接发送给队列,消息在发送给客户端时先在中心队列排队。对路由(Routing),负载均衡(Load balance)、数据持久化都有很好的支持。多用于进行企业级的 ESB 整合。

1.4 rabbitMQj介绍

RabbitMQ 是由 erlang 语言开发,基于 AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。

RabbitMQ 提供了 6 种模式:简单模式,work 模式,Publish/Subscribe 发布与订阅模式,Routing 路由模式,Topics 主题模式,RPC 远程调用模式

alt

2 RabbitMQ入门(简单模式)

alt

P:生产者,也就是要发送消息的程序
C:消费者:消息的接受者,会一直等待消息到来。
Queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
  • 添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>rabbitMQ01</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>
    </dependencies>
</project>
  • 创建连接工具类
public class RabbitMQConUtil {
    private static final int PORT = 5672;
    private static final String HOST = "localhost";
    private static final String USER_NAME = "guest";
    private static final String PWD = "guest";
    private ConnectionFactory factory;
    private Connection connection;
    public RabbitMQConUtil(String virtualHost) {
        //创建链接工厂
        factory = new ConnectionFactory();
        //参数设置
        factory.setHost(HOST);//IP
        factory.setPort(PORT);//端口号
        factory.setUsername(USER_NAME);//用户
        factory.setPassword(PWD);//密码
        factory.setVirtualHost(virtualHost);//虚拟主机,相当于mysql中db
    }
    public RabbitMQConUtil(String host,int port,String userName,String pwd,String virtualHost){
        factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(userName);
        factory.setPassword(pwd);
        factory.setVirtualHost(virtualHost);
    }
    public Connection getConnection() throws Exception {
        return factory.newConnection();
    }
}
  • 创建简单生产者
public class SimpleProduct {
    public static void main(String[] args) throws Exception {
        RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
        Connection connection = util.getConnection();
        //创建频道-channel = connection.createChannel()
        Channel channel = connection.createChannel();
        //声明队列-channel.queueDeclare(名称,是否持久化,是否独占本连接,是否自动删除,附加参数)
        channel.queueDeclare("simple_queue", true, false, false, null);
        //创建消息-String m = xxx
        String message = "欢迎来到我的世界";
        //消息发送-channel.basicPublish(交换机[默认 Default Exchage],路由key[简单模式可以传递队列名称],消息其它属性,消息内容)
        channel.basicPublish("", "simple_queue", null, message.getBytes("utf-8"));
        //关闭资源-channel.close();connection.close()
        channel.close();
        connection.close();
    }
}
  • 结果 alt

alt

  • 创建消费者
public class SimpleConsumer {
    public static void main(String[] args) throws Exception{
        RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
        Connection connection = util.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("simple_queue", true, false, false, null);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由的 key
                String routingKey = envelope.getRoutingKey();
                //获取交换机信息
                String exchange = envelope.getExchange();
                //获取消息 ID
                long deliveryTag = envelope.getDeliveryTag();
                //获取消息信息
                String message = new String(body, "utf-8");
                System.out.println(
                        "routingKey:" + routingKey +
                                ",exchange:" + exchange +
                                ",deliveryTag:" + deliveryTag +
                                ",message:" + message);
            }
        };
        channel.basicConsume("simple_queue",true,consumer);
    }
}

3 RabbitMQ进阶学习

3.1 work queues工作队列模式

alt

Work Queues 与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。 应用场景:对于任务过重或任务较多情况,使用工作队列模式使用多个消费者可以提高任务处理的速度。

  • 代码实现

生产者

public class WorkProduct {
    public static void main(String[] args) throws Exception {
        RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
        Connection connection = util.getConnection();
        Channel channel = connection.createChannel();
        for (int i = 0; i < 10; i++) {
            channel.queueDeclare("work_queue",true,false,false,null);
            String mes = "welcome to my house:" + i;
            channel.basicPublish("","work_queue",null,mes.getBytes("UTF-8"));
        }
        channel.close();
        connection.close();
    }
}

两个或多个消费者

public class WorkConsume1 implements Runnable {
    public void run() {
        RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
        Connection connection;
        try {
            connection = util.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare("work_queue", true, false, false, null);
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String routingKey = envelope.getRoutingKey();

                    String exchange = envelope.getExchange();

                    long deliveryTag = envelope.getDeliveryTag();

                    String message = new String(body, "utf-8");
                    System.out.println(
                            "routingKey:" + routingKey +
                                    ",exchange:" + exchange +
                                    ",deliveryTag:" + deliveryTag +
                                    ",message:" + message);
                }
            };
            channel.basicConsume("work_queue",true,consumer);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class RunClass {
    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        WorkConsume1 consume1 = new WorkConsume1();
        WorkConsume1 consume2 = new WorkConsume1();
        service.execute(consume1);
        service.execute(consume2);
    }
}

在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。

3.2 publish/subscribe发布订阅模式

alt 在发布订阅模型中,多了一个 x(exchange)角色,而且过程略有变化。

P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给 X(交换机)C:消费者,消息的接受者,会一直等待消息到来。
Queue:消息队列,接收消息、缓存消息。
Exchange:交换机,图中的 X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange 的类型。Exchange有常见以下 3 种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定 routing key 的队列
Topic:通配符,把消息交给符合 routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

alt

  • 代码实现

生产者,此时生产者连接的是交换机(exchange),不是队列

public class Product {
    public static void main(String[] args) throws Exception {
        RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
        Connection connection = util.getConnection();
        Channel channel = connection.createChannel();
        //声明exchange
        channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT);
        for (int i = 0; i < 10; i++) {
            String mes = "hello," + i + " welcome to China";
            channel.basicPublish("fanout_exchange", "", null, mes.getBytes("UTF-8"));
        }
        channel.close();
        connection.close();
    }
}

消费者,声明队列,队列绑定交换机

public class Consume1 implements Runnable {
    public void run() {
        RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
        Connection connection;
        try {
            connection = util.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare("publish_queue1", true, false, false, null);
            channel.queueBind("publish_queue1","fanout_exchange","key1");
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String routingKey = envelope.getRoutingKey();

                    String exchange = envelope.getExchange();

                    long deliveryTag = envelope.getDeliveryTag();

                    String message = new String(body, "utf-8");
                    System.out.println(
                            "routingKey:" + routingKey +
                                    ",exchange:" + exchange +
                                    ",deliveryTag:" + deliveryTag +
                                    ",message:" + message);
                }
            };
            channel.basicConsume("publish_queue1",true,consumer);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class Consume2 implements Runnable {
    public void run() {
        RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
        Connection connection;
        try {
            connection = util.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare("publish_queue2", true, false, false, null);
            channel.queueBind("publish_queue2","fanout_exchange","key2");
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String routingKey = envelope.getRoutingKey();

                    String exchange = envelope.getExchange();

                    long deliveryTag = envelope.getDeliveryTag();

                    String message = new String(body, "utf-8");
                    System.out.println(
                            "routingKey:" + routingKey +
                                    ",exchange:" + exchange +
                                    ",deliveryTag:" + deliveryTag +
                                    ",message:" + message);
                }
            };
            channel.basicConsume("publish_queue2",true,consumer);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class RunConsume {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2,2,60, TimeUnit.MINUTES,
                new LinkedBlockingDeque<Runnable>());
        Consume1 consume1 = new Consume1();
        Consume2 consume2 = new Consume2();
        executor.execute(consume1);
        executor.execute(consume2);
    }
}
  • 发布订阅模式与 work 队列模式的区别
1、work 队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
2、发布/订阅模式的生产方是面向交换机发送消息,work 队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
3、发布/订阅模式的消费者需要设置队列和交换机的绑定,work 队列模式不需要设置,实际上work队列模式会将队列绑 定到默认的交换机 。

3.3 Routing 路由模式

路由模式特点:

1.队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
2.消息的发送方在 向 Exchange 发送消息时,也必须指定消息的 RoutingKey。
3.Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key 完全一致,才会接收到消息

alt

P:生产者,向 Exchange 发送消息,发送消息时,会指定一个 routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与 routing key 完全匹配的队列
C1:消费者,其所在队列指定了需要 routing key 为 log.error 的消息
C2:消费者,其所在队列指定了需要 routing key 为 log.info、log.error、log.warning 的消息
  • 代码实现

生产者,创建生产者向三个routing key发信息,在发布步骤添加routing key

public class Product {
    public static void main(String[] args) throws Exception {
        RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
        Connection connection = util.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("direct_exchange", BuiltinExchangeType.DIRECT);
        for (int i = 0; i < 3; i++) {
            String routing_key = "";
            String mes = "hello:"+i;
            switch (i){
                case 0:
                    routing_key = "log.info";
                    break;
                case 1:
                    routing_key = "log.error";
                    break;
                case 2:
                    routing_key = "log.warning";
                    break;
            }
            channel.basicPublish("direct_exchange",routing_key,null,mes.getBytes("UTF-8"));
        }
        channel.close();
        connection.close();
    }
}

消费者,队列绑定时声明routing key

//消费者1 routing key为log.error
public class Consume1 implements Runnable {
    public void run() {
        RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
        Connection connection;
        try {
            connection = util.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare("publish_queue1", true, false, false, null);
            channel.queueBind("publish_queue1","direct_exchange","log.error");
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String routingKey = envelope.getRoutingKey();

                    String exchange = envelope.getExchange();

                    long deliveryTag = envelope.getDeliveryTag();

                    String message = new String(body, "utf-8");
                    System.out.println(
                            "routingKey:" + routingKey +
                                    ",exchange:" + exchange +
                                    ",deliveryTag:" + deliveryTag +
                                    ",message:" + message);
                }
            };
            channel.basicConsume("publish_queue1",true,consumer);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
//消费者2
public class Consume2 implements Runnable {
    public void run() {
        RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
        Connection connection;
        try {
            connection = util.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare("publish_queue2", true, false, false, null);
            channel.queueBind("publish_queue2","direct_exchange","log.info");
            channel.queueBind("publish_queue2","direct_exchange","log.error");
            channel.queueBind("publish_queue2","direct_exchange","log.warning");
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String routingKey = envelope.getRoutingKey();

                    String exchange = envelope.getExchange();

                    long deliveryTag = envelope.getDeliveryTag();

                    String message = new String(body, "utf-8");
                    System.out.println(
                            "routingKey:" + routingKey +
                                    ",exchange:" + exchange +
                                    ",deliveryTag:" + deliveryTag +
                                    ",message:" + message);
                }
            };
            channel.basicConsume("publish_queue2",true,consumer);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合routing key 的队列。

3.4 Topics通配符模式

alt Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过Topic 类型Exchange可以让队列在绑定 Routing key 的时候使用通配符! Routingkey 一般都是有一个或多个单词组成,多个单词之间以“ . ”分割,例如:item.insert

通配符规则

  • #:匹配一个或多个词
  • *:匹配不多不少恰好 1 个词
举例:
item.#:能够匹配 item.insert.abc 或者 item.insert
item.*:只能匹配 item.insert

生产者,与路由模式类似

public class Product {
    public static void main(String[] args) throws Exception {
        RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
        Connection connection = util.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC);
        for (int i = 0; i < 5; i++) {
            String routing_key = "";
            String mes = "hello:"+i;
            switch (i){
                case 0:
                    routing_key = "log.info";
                    break;
                case 1:
                    routing_key = "log.error";
                    break;
                case 2:
                    routing_key = "log.warning";
                    break;
                case 3:
                    routing_key = "log.info.bat";
                    break;
                case 4:
                    routing_key = "log.info.txt";
                    break;
            }
            channel.basicPublish("topic_exchange",routing_key,null,mes.getBytes("UTF-8"));
        }
        channel.close();
        connection.close();
    }
}

消费者

//消费者1,log.*
public class Consume1 implements Runnable {
    public void run() {
        RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
        Connection connection;
        try {
            connection = util.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare("publish_queue1", true, false, false, null);
            channel.queueBind("publish_queue1","topic_exchange","log.*");
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String routingKey = envelope.getRoutingKey();
                    String exchange = envelope.getExchange();
                    long deliveryTag = envelope.getDeliveryTag();
                    String message = new String(body, "utf-8");
                    System.out.println(
                            "routingKey:" + routingKey +
                                    ",exchange:" + exchange +
                                    ",deliveryTag:" + deliveryTag +
                                    ",message:" + message);
                }
            };
            channel.basicConsume("publish_queue1",true,consumer);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
//消费者2 log.#
public class Consume2 implements Runnable {
    public void run() {
        RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
        Connection connection;
        try {
            connection = util.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare("publish_queue2", true, false, false, null);
            channel.queueBind("publish_queue2","topic_exchange","log.#");
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String routingKey = envelope.getRoutingKey();
                    String exchange = envelope.getExchange();
                    long deliveryTag = envelope.getDeliveryTag();
                    String message = new String(body, "utf-8");
                    System.out.println(
                            "routingKey:" + routingKey +
                                    ",exchange:" + exchange +
                                    ",deliveryTag:" + deliveryTag +
                                    ",message:" + message);
                }
            };
            channel.basicConsume("publish_queue2",true,consumer);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
//消费者3 log.info.*
public class Consume3 implements Runnable {
    public void run() {
        RabbitMQConUtil util = new RabbitMQConUtil("/firstVirtualhost");
        Connection connection;
        try {
            connection = util.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare("publish_queue3", true, false, false, null);
            channel.queueBind("publish_queue3","topic_exchange","log.info.*");
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String routingKey = envelope.getRoutingKey();
                    String exchange = envelope.getExchange();
                    long deliveryTag = envelope.getDeliveryTag();
                    String message = new String(body, "utf-8");
                    System.out.println(
                            "routingKey:" + routingKey +
                                    ",exchange:" + exchange +
                                    ",deliveryTag:" + deliveryTag +
                                    ",message:" + message);
                }
            };
            channel.basicConsume("publish_queue3",true,consumer);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Topic 主题模式可以实现 Publish/Subscribe 发布订阅模式 和 Routing 路由模式的双重功能;只是Topic在配置 routing key 的时候可以使用通配符,显得更加灵活。

3.5 总结

  • 1、简单模式 HelloWorld 一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
  • 2、工作队列模式 Work Queue 一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
  • 3、发布订阅模式 Publish/subscribe 需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列
  • 4、路由模式 Routing 需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列
  • 5、通配符模式 Topic 需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列

4 SpringBoot整合RabbitMQ

4.1 生产者

生成交换机、队列,绑定交换机与队列,发送消息到队列中

application.properties

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/firstVirtualhost

生成交换机、队列,绑定交换机与队列

@Configuration
public class ProductConfig {
    //创建交换机
    @Bean(name = "topicExchange")
    public TopicExchange topicExchange(){
        return new TopicExchange("topic_exchange_springboot");
    }
    //创建队列
    @Bean(name = "topicQueue")
    public Queue queue(){
        return QueueBuilder.durable("topic_queue_springboot").build();
    }
    //绑定交换机与队列
    @Bean
    public Binding binding(@Qualifier("topicQueue") Queue queue,
                           @Qualifier("topicExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("log.#").noargs();
    }
}
@RunWith(SpringRunner.class)
@SpringBootTest
public class ProductTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void test(){
        rabbitTemplate.convertAndSend("topic_exchange_springboot","log.info","发送了info消息");
        rabbitTemplate.convertAndSend("topic_exchange_springboot","log.error","发送了error消息");
        rabbitTemplate.convertAndSend("topic_exchange_springboot","log.warning","发送了warning消息");
    }
}

4.2 消费者

重新开启一个session

@Component
public class ConsumerConfig {
    @RabbitListener(queues = "topic_queue_springboot")
    public void listener(String msg){
        System.out.println(msg);
    }
}

5 RabbitMQ的重要问题

1、什么是rabbitmq

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端的消息队列。

2、为什么要使用rabbitmq

  • 在分布式系统下具备异步,削峰,负载均衡等一系列高级功能;
  • 拥有持久化的机制,进程消息,队列中的信息也可以保存下来。
  • 实现消费者和生产者之间的解耦
  • 对于高并发场景下,利用消息队列可以使得同步访问变为串行访问达到一定量的限流,利于数据库的操作。
  • 可以使用消息队列达到异步下单的效果,排队中,后台进行逻辑下单。

3、使用rabbitmq的场景

  • 服务间异步通信
  • 顺序消费
  • 定时任务
  • 请求削峰

4、如何确保消息正确地发送至RabbitMQ? 如何确保消息接收方消费了消息?

发送方确认模式 将信道设置成confirm模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的ID。一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一ID)。如果RabbitMQ发生内部错误从而导致消息丢失,会发送一条nack(not acknowledged,未确认)消息。发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。

接收方确认机制 接收方消息确认机制:消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ才能安全地把消息从队列中删除。 这里并没有用到超时机制,RabbitMQ仅通过Consumer的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ给了Consumer足够长的时间来处理消息。保证数据的最终一致性

下面罗列几种特殊情况: 如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要去重) 如果消费者接收到消息却没有确认消息,连接也未断开,则RabbitMQ认为该消费者繁忙,将不会给该消费者分发更多的消息。

5、如何避免消息重复投递或重复消费?

在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重的依据(消息投递失败并重传),避免重复的消息进入队列; 在消息消费时,要求消息体中必须要有一个bizId(对于同一业务全局唯一,如支付ID、订单ID、帖子ID等)作为去重的依据,避免同一条消息被重复消费。

6、消息基于什么传输?

由于TCP连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ使用信道的方式来传输数据。信道是建立在真实的TCP连接内的虚拟连接,且每条TCP连接上的信道数量没有限制。

7、消息如何分发?

若该队列至少有一个消费者订阅,消息将以循环(round-robin)的方式发送给消费者。每条消息只会分发给一个订阅的消费者(前提是消费者能够正常处理消息并进行确认)。通过路由可实现多消费的功能

8、消息怎么路由?

消息提供方->路由->一至多个队列,消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。通过队列路由键,可以把队列绑定到交换器上。 消息到达交换器后,RabbitMQ会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则); 常用的交换器主要分为一下三种: fanout:如果交换器收到消息,将会广播到所有绑定的队列上 direct:如果路由键完全匹配,消息就被投递到相应的队列 topic:可以使来自不同源头的消息能够到达同一个队列。 使用topic交换器时,可以使用通配符

9、如何确保消息不丢失?

消息持久化,当然前提是队列必须持久化。RabbitMQ确保持久性消息能从服务器重启中恢复的方式是 ,将它们写入磁盘上的一个持久化日志文件, 当发布一条持久性消息到持久交换器上时,Rabbit会在消息提交到日志文件后才发送响应。一旦消费者从持久队列中消费了一条持久化消息,RabbitMQ会在持久化日志中把这条消息标记为等待垃圾收集。如果持久化消息在被消费之前RabbitMQ重启,那么Rabbit会自动重建交换器和队列(以及绑定),并重新发布持久化日志文件中的消息到合适的队列。

10、使用RabbitMQ有什么好处?

  • 服务间高度解耦
  • 异步通信性能高
  • 流量削峰

11、RabbitMQ的集群

Rabbitmq 集群

集群目的就是为了实现rabbitmq的高可用性,集群分为2种

普通集群:主备架构,只是实现主备方案,不至于主节点宕机导致整个服务无法使用 镜像集群:同步结构,基于普通集群实现的队列同步

普通集群

slave节点复制master节点的所有数据和状态,除了队列数据,队列数据只存在master节点,但是Rabbitmq slave节点可以实现队列的转发,也就是说消息消费者可以连接到slave节点,但是slave需要连接到master节点转发队列,由此说明只能保证了服务可以用,无法达到高可用 alt slave节点队列可以查看到,但是不会同步数据

镜像集群 基于普通集群实现队列的集群主从,消息会在集群中同步(至少三个节点) alt

12、mq的缺点

系统可用性降低 **系统引入的外部依赖越多,越容易挂掉,**本来你就是A系统调用BCD三个系统的接口就好了,本来ABCD四个系统好好的,没啥问题,你偏加个MQ进来,万一MQ挂了咋整?MQ挂了,整套系统崩溃了,你不就完了么。

系统复杂性提高 硬生生加个MQ进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?

一致性问题 A系统处理完了直接返回成功了,本来都以为你这个请求就成功了;但是问题是,BD两个系统写库成功了,结果C系统写库失败了,咋整?你这数据就不一致了。