准备工作

RabbitMQMQ的简介和安装

MQ(Message Queue) 意为消息队列,运用的是生产者和消费者的设计模型,借用队列的数据结构:先进先出的特点,生产者可以不断的向队列中生产消息,消费者则可以在队列的另一端不断的消费消息,并且生产和消费的过程都是异步进行的,而且只关心消息的生产和接收,没有其它的业务的侵入,可以轻松实现系统的解耦,所以MQ又有一个别名叫消息中间件

通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据同学来实现分布式系统的集成。

市面上常用的MQ的区别:

MQ 特点
ActiveMQ Apache出品,最流行,纯Java程序,运行需要Java虚拟机支持
Kafka 开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务
RocketMQ RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点,RocketMQ是对Kafka的优化
RabbitMQ Erlang语言开发的消息队列模型,基于AMQP协议,主要特征是面向消息,队列,路由:包括点对点和消息订阅,稳定,安全并且可靠。

AMQP协议模型:

学过计算机网络的同学应该看的会很明白,队列类似一个路由器连接交换机的操作,交换机上面可以连接多个队列,生产者就可以指定交接机上面的队列向指定的队列发送消息,也可以用广播的方式发送消息到各个队列中,只要是接入该队列的消费者,就可以消费该消息。

安装:

链接:https://pan.baidu.com/s/1NKYymTioemob2FAK9z_z6g
提取码:d8kz
复制这段内容后打开百度网盘手机App,操作更方便哦

下载上面安装包,然后在自己服务器上按照下面步骤安装MQ

# 1.将rabbitmq安装包上传到linux系统中
	erlang-22.0.7-1.el7.x86_64.rpm
	rabbitmq-server-3.7.18-1.el7.noarch.rpm

# 2.安装Erlang依赖包
	rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm

# 3.安装RabbitMQ安装包(需要联网)
	yum install -y rabbitmq-server-3.7.18-1.el7.noarch.rpm
		注意:默认安装完成后配置文件模板在:/usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example目录中,需要	
				将配置文件复制到/etc/rabbitmq/目录中,并修改名称为rabbitmq.config
# 4.复制配置文件
	cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config

# 5.查看配置文件位置
	ls /etc/rabbitmq/rabbitmq.config

# 6.修改配置文件(参见下图:)
	vim /etc/rabbitmq/rabbitmq.config 

然后将61行的前面两个%%和尾部的,去掉

最后启动MQ的Web插件支持

# 7.执行如下命令,启动rabbitmq中的插件管理
	rabbitmq-plugins enable rabbitmq_management
	
	出现如下说明:
		Enabling plugins on node rabbit@localhost:
    rabbitmq_management
    The following plugins have been configured:
      rabbitmq_management
      rabbitmq_management_agent
      rabbitmq_web_dispatch
    Applying plugin configuration to rabbit@localhost...
    The following plugins have been enabled:
      rabbitmq_management
      rabbitmq_management_agent
      rabbitmq_web_dispatch

    set 3 plugins.
    Offline change; changes will take effect at broker restart.

# 8.启动RabbitMQ的服务
	systemctl start rabbitmq-server
	systemctl restart rabbitmq-server
	systemctl stop rabbitmq-server
	

# 9.查看服务状态(见下图:)
	systemctl status rabbitmq-server
  ● rabbitmq-server.service - RabbitMQ broker
     Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled)
     Active: active (running) since 三 2019-09-25 22:26:35 CST; 7s ago
   Main PID: 2904 (beam.smp)
     Status: "Initialized"
     CGroup: /system.slice/rabbitmq-server.service
             ├─2904 /usr/lib64/erlang/erts-10.4.4/bin/beam.smp -W w -A 64 -MBas ageffcbf -MHas ageffcbf -
             MBlmbcs...
             ├─3220 erl_child_setup 32768
             ├─3243 inet_gethost 4
             └─3244 inet_gethost 4
      .........

访问服务器地址:http://IP地址:15672/

RabbitMQ消息模型

登录官网可以看到:RabbitMq的消息模型

1. “Hello World!” 直连模式

这个是最简单的消息队列模型,其中生产者和消费者是一一对应的。

消息队列为红色部分,类似于一个邮箱,P就类似与一个邮递员,可以往邮箱中传递邮件,邮箱充当邮件消息的缓冲,C就类似于我们收件人,可以从邮箱中去出邮件。

步骤1:导入MQ相关Maven依赖:

<!-- rabbit mq相关依赖-->
<dependency>
	<groupId>com.rabbitmq</groupId>
	<artifactId>amqp-client</artifactId>
	<version>5.7.2</version>
</dependency>

步骤2:登录rabbitmq服务器,建立队列

步骤3:生产者代码:

@SpringBootTest
class Rabbitmq01ApplicationTests {
   

	//mq的生产者使用单元测试类
	@Test
	void contextLoads() throws IOException, TimeoutException {
   

		//1.创建连接工厂
		ConnectionFactory connectionFactory = new ConnectionFactory();
		//2.设置远程rabbitmq-server主机地址
		connectionFactory.setHost("192.168.2.163");
		//3.设置访问rabbitmq-server的用户名和密码 端口
		connectionFactory.setUsername("liuzeyu12a");
		connectionFactory.setPassword("809080");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/ems");
		//3.创建channel通道
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		//4.设置管道参数
		//参数1:通道名称 参数2:mq服务重启,是否持久化 参数3:是否独占队列(一般为否) 参数5:是否消费完成 自动删除 参数6:其它额外属性
		channel.queueDeclare("test",false,false,false,null);

		//5.发布消息
		//参数1:交换机 参数2:队列名称 参数3:传递消息的额外设置 参数4:消息内容
		channel.basicPublish("","test", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello liuzeyu12a".getBytes());
		
		channel.close();
		connection.close();
	}

}

步骤4:消费者代码

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
   

        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.设置远程rabbitmq-server主机地址
        connectionFactory.setHost("192.168.2.163");
        connectionFactory.setVirtualHost("/ems");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("liuzeyu12a");
        connectionFactory.setPassword("809080");
        //3.创建channel通道
        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();
        //4.设置通道参数
        channel.queueDeclare("test",false,false,false,null);

        //消费消息
        channel.basicConsume("test", true, new DefaultConsumer(channel){
   
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
   
                //消费的消息在body中
                System.out.println("消费的消息:" + new String(body).toString());
            }
        });
    }

观察现象:

先运行生产者后,在队列中我们会看到一条待消费的消息

再次运行消费者

看到存到消息队列中的消息被打印出来了,也就是被消费了,再次查看队列的消息

需要注意的有:

  • 生产者和消费者建立通道的时候,参数必须一一对应
  • 消费者一般是没有去关闭连接的,因为它要随时监听队列中是否有消息,紧接着消费它
  • 如果想要看到消费者打印消息在控制台,就要避免将消费者写在@Test中使用,原因是springboot单元测试不支持多线程测试,回调函数可能还没调用到主线程已经结束。

1.1 工具类封装

看到生产者和消费者的代码出现很多的冗余,封装一下代码!

//类似与JDBC
public class RabbitMqUtils {
   

    private static ConnectionFactory connectionFactory;
    static {
   
        //1.创建连接工厂
        connectionFactory = new ConnectionFactory();
        //2.设置远程rabbitmq-server主机地址
        connectionFactory.setHost("192.168.2.163");
        //3.设置访问rabbitmq-server的用户名和密码 端口
        connectionFactory.setUsername("liuzeyu12a");
        connectionFactory.setPassword("809080");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/ems");
    }

    //获取连接
    public static Connection  getConnect() {
   
        try {
   
            return connectionFactory.newConnection();
        }catch (Exception e){
   
            e.printStackTrace();
        }
        return null;
    }

    public static void close(Channel channel,Connection connection){
   
        try {
   
            if(channel != null){
   
                channel.close();
            }
            if(connection != null){
   
                connection.close();
            }
        }catch (Exception e){
   
            e.printStackTrace();
        }

    }
}

使用工具类重新改造生产者:

@SpringBootTest
class Rabbitmq01ApplicationTests {
   

	//mq的生产者使用单元测试类
	@Test
	void contextLoads() throws IOException, TimeoutException {
   
		Connection connection = RabbitMqUtils.getConnect();
		Channel channel = connection.createChannel();
		//4.设置管道参数
		//参数1:通道名称 参数2:mq服务重启,是否持久化 参数3:是否独占队列(一般为否) 参数5:是否消费完成 自动删除 参数6:其它额外属性
		channel.queueDeclare("test",false,false,false,null);

		//发布消息
		//参数1:交换机 参数2:队列名称 参数3:传递消息的额外设置 参数4:消息内容
		channel.basicPublish("","test", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello liuzeyu12a".getBytes());

		//3.释放资源
		RabbitMqUtils.close(channel,connection);
	}

}

使用工具类重新改造消费者:

//监听队列消费消息
public class Consumer {
   

    public static void main(String[] args) throws IOException {
   

        //1.创建连接
        Connection connection = RabbitMqUtils.getConnect();
        Channel channel = connection.createChannel();
        //2.设置通道参数
        channel.queueDeclare("test",false,false,false,null);

        //3.消费消息
        channel.basicConsume("test", true, new DefaultConsumer(channel){
   
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
   
                //消费的消息在body中
                System.out.println("消费的消息:" + new String(body).toString());
            }
        });
    }
}

1.2 细节

  1. 持久化

如果我们的生产者在队列中产生了消息,消费者还没消费,rabbitmq服务器重启了,消息和队列就会被清空,如何避免这一情况呢?

简单解决办法有两个

将之前的队列删掉,用代码来生成新的队列,但要注意的是将channel.queueDeclare的第二个参数设置成true,表示该队列支持持久化

	//mq的生产者使用单元测试类
	@Test
	void contextLoads() throws IOException, TimeoutException {
   
		Connection connection = RabbitMqUtils.getConnect();
		Channel channel = connection.createChannel();
		//4.设置管道参数
		//参数1:通道名称 参数2:mq服务重启,是否持久化 参数3:是否独占队列(一般为否) 参数5:是否消费完成 自动删除 参数6:其它额外属性
		channel.queueDeclare("test",true,false,false,null);

		//发布消息
		//参数1:交换机 参数2:队列名称 参数3:传递消息的额外设置 参数4:消息内容
		channel.basicPublish("","test", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello liuzeyu12a".getBytes());

		//3.释放资源
		RabbitMqUtils.close(channel,connection);
	}

还有一个方法就是直接用官网提供的界面的创建队列,并且选择持久化

配置完消息队列的持久化了,测试一下重启[root@localhost ~]# systemctl restart rabbitmq-server

发现队列虽然持久化了,但是消息并没有被持久化,这样看起来也毫无意义了,所以我们也可以消息持久化。

	@Test
	void contextLoads() throws IOException, TimeoutException {
   
		Connection connection = RabbitMqUtils.getConnect();
		Channel channel = connection.createChannel();
		//4.设置管道参数
		//参数1:通道名称 参数2:mq服务重启,是否持久化 参数3:是否独占队列(一般为否) 参数5:是否消费完成 自动删除 参数6:其它额外属性
		channel.queueDeclare("test",true,false,false,null);

		//发布消息
		//参数1:交换机 参数2:队列名称 参数3:传递消息的额外设置 参数4:消息内容
		channel.basicPublish("","test", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello liuzeyu12a".getBytes());

		//3.释放资源
		RabbitMqUtils.close(channel,connection);
	}

channel.basicPublish()的第三个参数设置为MessageProperties.PERSISTENT_TEXT_PLAIN,表示的是持久化消息。

channel.basicPublish()第三个参数是是否独占队列,一般我们都设置为false表示不同通道都可以向队列中发送消息,不独占。

channel.basicPublish()还有第四个参数是是否在消费者消费完之后,删除队列,看具体的业务场景。

2. work Queue


任务队列跟第一种的直连相比,会多出多个消费者,这种场景一般出现在,生产者的生产消息速度 > 消费者的消费速度,如果没有多个消费者一起消费队列中的消息,会引起队列的堵塞,消息的传递的也会不及时。

2.1 轮询

实例:

  1. 生产者:
//生产者
public class Provider {
   
    public static void main(String[] args) throws IOException {
   
        //1获取连接
        Connection connect = RabbitMqUtils.getConnect();
        //2.创建通道
        Channel channel = connect.createChannel();
        //3.设置通道参数
        channel.queueDeclare("test",true,false,false,null);
        //4.发送消息
        for (int i = 1; i <= 20; i++) {
   
            channel.basicPublish("", "test",MessageProperties.PERSISTENT_TEXT_PLAIN,(i +"->meg:").getBytes());
        }
        RabbitMqUtils.close(channel,connect);
    }
}
  1. 消费者1:
//消费者1
public class Consumer1 {
   
    public static void main(String[] args) throws IOException {
   
        //1.创建连接
        Connection connect = RabbitMqUtils.getConnect();
        //2.创建通道
        Channel channel = connect.createChannel();
        //3.配置通道参数
        channel.queueDeclare("test",true,false,false,null);
        //

        channel.basicConsume("test",true,new DefaultConsumer(channel){
   
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
   
                System.out.println("消费者1:"+new String(body));
            }
        });
    }
}

  1. 消费者2:
//消费者2
public class Consumer2 {
   

    public static void main(String[] args) throws IOException {
   
        //1.获取连接
        Connection connect = RabbitMqUtils.getConnect();
        //2.获取通道
        Channel channel = connect.createChannel();
        //3.配置通道参数
        channel.queueDeclare("test",true,false,false,null);
        //4.接收消息
        channel.basicConsume("test",true,new DefaultConsumer(channel){
   
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
   
                System.out.println("消费者2:"+new String(body));
            }
        });
    }
}

为了测试的准确性,先把之前的队列清空。
依次运行消费者1,消费者2。
然后运行生产者->

观察结果消费者1运行结果:

消费者2:

发现消费者1和2是轮询从队列中消费,为了进一步验证这个结果,可以染消费者2的消费速度放慢

public class Consumer2 {
   

    public static void main(String[] args) throws IOException {
   
        //1.获取连接
        Connection connect = RabbitMqUtils.getConnect();
        //2.获取通道
        Channel channel = connect.createChannel();
        //3.配置通道参数
        channel.queueDeclare("test",true,false,false,null);
        //4.接收消息
        channel.basicConsume("test",true,new DefaultConsumer(channel){
   
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
   
                try {
   
                    Thread.sleep(1000);
                }catch (Exception e){
   
                    e.printStackTrace();
                }
                System.out.println("消费者2:"+new String(body));
            }
        });
    }
}

结果仍然是依次交替消费数据,与我们的猜想一致。

2.2 消息自动确认机制

消息自动确认机制 指的是消费者消费完消息之后向rabbitmq发送确认消息消费的一种机制。

我们的任务消息队列,消费者默认是轮询消费队列中的消息,无论消费者他们各自的消费速度如何,都将分配同样个数的消息。

但是正常情况下,我们又希望消费快的能够多消费一点消息,消费慢的少消费一点,达到能者多劳的目的。

要想实现这一目的,我们就必须关闭掉消息的自动确认机制。

生产者:

//生产者
public class Provider {
   
    public static void main(String[] args) throws IOException {
   
        //1获取连接
        Connection connect = RabbitMqUtils.getConnect();
        //2.创建通道
        Channel channel = connect.createChannel();
        //3.设置通道参数
        channel.queueDeclare("test",true,false,false,null);
        //4.发送消息
        for (int i = 1; i <= 20; i++) {
   
            channel.basicPublish("", "test",MessageProperties.PERSISTENT_TEXT_PLAIN,(i +"->meg:").getBytes());
        }
        RabbitMqUtils.close(channel,connect);
    }
}

消费者1:

//消费者1
public class Consumer1 {
   
    public static void main(String[] args) throws IOException {
   
        //1.创建连接
        Connection connect = RabbitMqUtils.getConnect();
        //2.创建通道
        Channel channel = connect.createChannel();
        //3.配置通道参数
        channel.queueDeclare("test",true,false,false,null);
        //
        channel.basicQos(1); //每次获取一条消息
        channel.basicConsume("test",false,new DefaultConsumer(channel){
   
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
   
                System.out.println("消费者1:"+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);  //手动确认
            }
        });
    }
}

消费者2:

//消费者2
public class Consumer2 {
   

    public static void main(String[] args) throws IOException, InterruptedException {
   
        //1.获取连接
        Connection connect = RabbitMqUtils.getConnect();
        //2.获取通道
        Channel channel = connect.createChannel();
        //3.配置通道参数
        channel.queueDeclare("test",true,false,false,null);
        channel.basicQos(1);//每次获取一条消息
        //4.接收消息
        channel.basicConsume("test",false,new DefaultConsumer(channel){
   
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
   
                try {
   
                    Thread.sleep(1000);
                }catch (Exception e){
   
                    e.printStackTrace();
                }
                System.out.println("消费者2:"+new String(body));
               channel.basicAck(envelope.getDeliveryTag(),false);  //手动确认消费
            }
        });
    }
}

现象:

做到了能者多劳!

<mark>自动确认机制存在的问题:</mark>
可以避免消息丢失的问题,如果消费者1和消费者2同时在消费消息,默认情况下用的是自动确认,如果消费者1宕机了,消息才消费到一半,但是队列已经得到它的自动确认标志,认为已经消费完了,这就会造成消息丢失。

<mark>关闭自动确认机制有一个很好的好处是:</mark>
但是关闭了自动确认机制就会等到我们具体的业务执行完,我们可以手动提交消息消费标志给mq服务器,告诉它消息已经执行完,这中间执行我们业务即使宕机,消息也不会丢失,因为我们是在最后告诉mq消息被消费完了,具体业务并没有执行完,消息还在,服务器我们就可以再利用消息执行相关具体业务。

3. 广播 fanout模式


这是一种广播的消息模型,生产者生产的消息,要想发送给多个消费者,这中间要借助于交换机,多个交换机与队列进行绑定,那么每个队列中都会接收到生产者发送的消息,与队列绑定的消费者就能接收到信息。

这个和计算机网络中的交换机实现的是相同的原理,多个路由器与交换机绑定,那么才会实现广播路由的分发。

编码实现:

//生产者广播
public class Provider {
   

    public static void main(String[] args) throws IOException {
   
        //创建连接
        Connection connect = RabbitMqUtils.getConnect();
        //获取通道
        Channel channel = connect.createChannel();
        //通道绑定交换机 参数1:交换机名称 参数2:类型为广播
        channel.exchangeDeclare("log","fanout");
        //发送消息到交换
        channel.basicPublish("log","",null,"你好交换机".getBytes());

        RabbitMqUtils.close(channel,connect);
    }
}

观察交换机

运行生产者代码:


多出一个log的交换机,使用的是广播的类型 fanout

我们建立多个消费者,绑定这台交换机,测试能否收到消息。

消费者1:

//消费者1
public class Consumer1 {
   

    public static void main(String[] args) throws IOException {
   
        //1.创建连接
        Connection connect = RabbitMqUtils.getConnect();
        //2.创建通道
        Channel channel = connect.createChannel();
        //3.绑定交换机
        channel.exchangeDeclare("log","fanout");
        //4.创建临时队列
        String queue = channel.queueDeclare().getQueue();
        //5.将临时队列绑定交换机
        channel.queueBind(queue,"log","");
        //6.接收消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
   
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
   
                System.out.println("消费者1:"+ new String(body));
            }
        });
    }
}

消费者2:

//消费者2
public class Consumer2 {
   

    public static void main(String[] args) throws IOException {
   
        //1.创建连接
        Connection connect = RabbitMqUtils.getConnect();
        //2.创建通道
        Channel channel = connect.createChannel();
        //3.绑定交换机
        channel.exchangeDeclare("log","fanout");
        //4.创建临时队列
        String queue = channel.queueDeclare().getQueue();
        //5.将临时队列绑定交换机
        channel.queueBind(queue,"log","");
        //6.接收消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
   
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
   
                System.out.println("消费者2:"+ new String(body));
            }
        });
    }
}

优先启动消费者1,2,生产者发送消息

消费者生成了两个临时队列

接着运行生产者


可以发现做到了一个消息多个消费者消费的目的,适用的场景:登录/注册这一举动可以送积分,送奖品,发布订阅消息。

当停止消费者,临时队列也跟着消失

4. 直连路由


直连路由路由的工作模式:Receiving messages selectively ,翻译过来也就是,消费者可以选择性的接收到生产者的消息。

这相比广播的方式会灵活点,广播就是不要消费要不要,只要和消费者的队列和交换机绑定,那么消费者就一定会接收到消息。

直连路由的业务场景可以用于日志系统,如上图,可以把C1比作磁盘,C2比作控制台,我们系统产生的日志如果error类型要存放到磁盘的同时在控制台打印,而info,warn,error都要在控制台打印,这个时候广播的方式已经控制不了了,就要适用新的消息模型,直连路由的方式。

生产者:

//具有路由形式的直连
public class Provider {
   
    public static void main(String[] args) throws IOException {
   
        //创建连接
        Connection connect = RabbitMqUtils.getConnect();
        //机获取通道
        Channel channel = connect.createChannel();
        //配置通道配置
        channel.exchangeDeclare("routing_direct","direct");
        //获取临时队列
        String queue = channel.queueDeclare().getQueue();
        //发送消息
        String routingKey1 = "info";
        channel.basicPublish("routing_direct",routingKey1,
                null,("这是一种直连路由,routingKey:"+routingKey1).getBytes());
        String routingKey2 = "error";
        channel.basicPublish("routing_direct",routingKey2,
                null,("这是一种直连路由,routingKey:"+routingKey2).getBytes());
        String routingKey3 = "warning";
        channel.basicPublish("routing_direct",routingKey3,
                null,("这是一种直连路由,routingKey:"+routingKey3).getBytes());
        //是否资源
        RabbitMqUtils.close(channel,connect);

    }
}

消费者C1:

//具有路由形式的直连
public class Consumer1 {
   
    public static void main(String[] args) throws IOException {
   
        //获取连接
        Connection connect = RabbitMqUtils.getConnect();
        //获取通道
        Channel channel = connect.createChannel();
        //配置通道
        channel.exchangeDeclare("routing_direct","direct");
        //创建临时队列
        String queue = channel.queueDeclare().getQueue();
        //绑定交换
        channel.queueBind(queue,"routing_direct","error");
        //消费消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
   
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
   
                System.out.println("消费者1:"+ new String(body));
            }
        });
    }
}

消费者C2:

//具有路由形式的直连
public class Consumer2 {
   
    public static void main(String[] args) throws IOException {
   
        //获取连接
        Connection connect = RabbitMqUtils.getConnect();
        //获取通道
        Channel channel = connect.createChannel();
        //配置通道
        channel.exchangeDeclare("routing_direct","direct");
        //创建临时队列
        String queue = channel.queueDeclare().getQueue();
        //绑定交换
        channel.queueBind(queue,"routing_direct","info");
        channel.queueBind(queue,"routing_direct","warning");
        channel.queueBind(queue,"routing_direct","error");
        //消费消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
   
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
   
                System.out.println("消费者1:"+ new String(body));
            }
        });
    }
}

输出:


通过现象也可以看出,我们可以通过交换机绑定的路由key 和队列与交换机绑定的路由key产生对应关系,来决定消息的指向性发送。

5. 动态路由


动态路由是直连路由的升级版,更为灵活的的控制路由的分发,通过通配符来控制路由的下发。

通配符有两种:

*:表示user.*代表的是匹配user.[一个单词],user.findAll,userdelete都会被匹配上
#:表示user.#代表的是匹配user.[0个单词/多个单词],user,user.findAll,userdelete,user.findAll.***都会被匹配上

生产者:

//动态路由 Provider
public class Provider {
   

    public static void main(String[] args) throws IOException {
   
        //创建连接
        Connection connect = RabbitMqUtils.getConnect();
        //获取通道
        Channel channel = connect.createChannel();
        //发送消息
        String routingKey = "user.findAll";
        channel.basicPublish("topics",routingKey,null,("动态路由,通配符:"+routingKey).getBytes());
        //释放资源
        RabbitMqUtils.close(channel,connect);
    }
}

消费者1:

//动态路由 消费者1
public class Consumer1 {
   

    public static void main(String[] args) throws IOException {
   
        //创建连接
        Connection connect = RabbitMqUtils.getConnect();
        //获取通道
        Channel channel = connect.createChannel();
        //创建临时队列
        String queue = channel.queueDeclare().getQueue();
        //绑定交换机
        channel.exchangeDeclare("topics","topic");
        channel.queueBind(queue,"topics","user.*");
        //接收消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
   
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
   
                System.out.println("消费者1:" + new String(body));
            }
        });
    }
}

消费者1会接收到消息,因为它匹配到了生产者的key,user.findAll,消费者的user.*匹配到一个单词了,将user.*改为user.#同样可以匹配到。

如果生产者绑定的是

String routingKey = "user.findAll.***";

消费者则必须适用

//绑定交换机
channel.exchangeDeclare("topics","topic");
channel.queueBind(queue,"topics","user.#");

才可以匹配上。

6. SpringBoot整合RabbitMQ

实际开发中,springboot是最常用的开发平台,我们也要学会在SpringBoot平台上适用RabbitMQ

第一步:导入相关依赖(建议Springboot适用2.1.x)

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

第二步:创建生产者测试类

@SpringBootTest(classes = Rabbitmq01Application.class)
@RunWith(SpringRunner.class)
public class SpringbootMQ {
   

    @Autowired
    private RabbitTemplate rabbitTemplate;
}

6.1 HELLO WORLD

其中RabbitTemplate 适用操作MQ的一套模板,可以简化我们操作MQ


生产者:

    //HELLO WORLD
    @Test
    public void hello(){
   
        //参数1:队列名称 参数2:要发送队列的内容
        rabbitTemplate.convertAndSend("boot-mq","springboot hello world");
    }

当我们运行完生产者后,并不会在队列中生成队列,因为使用rabbitTemplate创建队列并发送消息,此时并没有消费者绑定改队列,所以我们需要一个消费者:

@Component
public class Consumer {
   

    @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue("boot-mq"))  //也可以用在类上
    public void receive1(String message){
   
        System.out.println("message:"+message);
    }
}

最后我们运行一下生产者

消息被消费的信息打印在了控制台。

6.2 work Queue


生产者:

    //work Queue
    @Test
    public void work(){
   
        //参数1:队列名称 参数2:要发送队列的内容
            rabbitTemplate.convertAndSend("work","springboot hello work Queue");
    }

消费者:

@Component
public class Consumer1 {
   

// @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive1(String message){
   
        System.out.println("message1:"+message);
    }

// @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive2(String message){
   
        System.out.println("message2:"+message);
    }
}

生产运行完之后发现,消息会被消费者1或2任意一个消费,类似,我们生产者创建多个消息

    @Test
    public void work(){
   
        //参数1:队列名称 参数2:要发送队列的内容
        for (int i = 0; i < 10; i++) {
   
            rabbitTemplate.convertAndSend("work","springboot hello work Queue");
        }
    }

再次运行

多个消费者还是进行轮询的消费,符合我们预期的场景。

6.3. 广播 fanout模式


生产者:

    //fanout
    @Test
    public void fanout(){
   
        rabbitTemplate.convertAndSend("logs","","springboot fanout Queue");
    }

消费者:

@Component
public class FanoutConsumer {
   
//创建临时队列

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,   //创建临时队列
            exchange = @Exchange(name = "logs",type = "fanout")
    ))
    public void receive1(String message){
   
        System.out.println("message1:"+message);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,     //创建临时队列
            exchange = @Exchange(name = "logs",type = "fanout")
    ))
    public void receive2(String message){
   
        System.out.println("message2:"+message);
    }
}

运行生产者,发现消息同时被两个消费者消费了。

6.4 直连路由


生产者:

    //direct
    @Test
    public void direct(){
   
        rabbitTemplate.convertAndSend("direct","info","springboot direct Queue");
    }

发送key = info的消息

消费者:

@Component
public class ConsumerDirect {
   

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,  //临时队列
            key = {
   "error","warn","info"},
            exchange = @Exchange(name = "direct",type = "direct")
    ))
    public void receive1(String message){
   
        System.out.println("message1:"+message);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,   //临时队列
            key = {
   "error"},
            exchange = @Exchange(name = "direct",type = "direct")
    ))
    public void receive2(String message){
   
        System.out.println("message2:"+message);
    }
}

此时只会有消费者1才能消费消息

6.5 动态路由

生产者:

    @Test
    public void topics(){
   
        rabbitTemplate.convertAndSend("topics","product.save","springboot topics Queue");
    }

消费者的key以product开头都可以匹配到

消费者

@Component
public class ConsumerTopics {
   


    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,  //临时队列
            exchange = @Exchange(name = "topics",type = "topic"),
            key = {
   "user.*"}
    ))
    public void receive1(String message){
   
        System.out.println("message1:"+message);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,   //临时队列
            exchange = @Exchange(name="topics",type = "topic"),
            key = {
   "product.#"}
    ))
    public void receive2(String message){
   
        System.out.println("message2:"+message);

    }
}

消费者2可以接收到消息

7. RabbitMQ应用场景

7.1 异步处理

场景:用户的注册:注册成功后发送短信通知用户

传统的实现方式有两种:串行并行

串行:

类似传统的单核CPU一样执行任务,效率很慢,而且环环相扣,程序之间的耦合度高

并行:

可以观察出,解耦的还不够彻底,因为我们的用户注册是为了使用,其实并不关心短信是否收到,所以我们的关心重点在于是否将用户的信息写入数据库,只要入库,用户就可以使用了。

MQ中间件解耦:

引入中间件后,由于将消息写入中间件的速度很快,甚至可以忽略不计,所以大大的减少了相应的时间,响应时间是串行的3倍,并行的2倍。

7.2 应用解耦

场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.


如果使用订单系统和库存系统直接通信,会造成一一种情况,就是如果库存系统宕机,那么订单系统的订单将会丢失,造成用户的利益损失。

引入消息队列

  1. 订单系统:会在用户下单后,订单系统会发送消息到队列中进行持久化处理,返回用户下单成功
  2. 库存系统:获取消息的队列的订单,进行库存操作,即使订单系统故障,也会保证消息的可靠传递,不会丢失。

7.3 流量削峰

场景:
秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。

此时的中间件可以发挥以下作用:

  1. 控制活动人数,超过一定量的请求订单直接丢弃(所以你的秒杀都不一定抢得到)
  2. 可以缓解短时间内高流量压垮应用,应用程序按照自己的处理能力处理订单

8. RabbitMQ集群

集群环境会使用Docker来部署,顺便自己再复习点Docker知识,因为用Docker来部署集群用于学习真的很方便。

8.1 RabbitMQ普通集群

普通集群:以三台服务器为例,主Master节点,两个从Slave,这边的RabbitMQ普通集群并不支持高可用的场景,它的工作原理是:主Master的交换机会和从的从交换机同步,它们所绑定的队列也会进行同步,但是队列中的消息并不会同步,当我们的消费者要消费消息的时候,有两个从服务器向主服务器拿。一旦主宕机,从服务器的消息就可能丢失,而且并不会进行故障切换,从服务器不会变成主服务器,做不到高可用!

实验的场景需要搭建服务器集群,一般的物理机同时运行三个服务器会卡,于是可以在docker上面部署集群。
可以参考这位大哥的集群部署教程:Docker环境RabbitMQ集群部署

搭建几个基本步骤:

  1. docker安装:
# step 1: 安装必要的一些系统工具
sudo yum install -y yum-utils
# Step 2: 添加软件源信息
sudo yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
# Step 3: 更新并安装 Docker-CE(社区版)
sudo yum makecache fast
sudo yum -y install docker-ce
# Step 4: 开启Docker服务
sudo service docker start

  1. rabbitmq安装
#拉取镜像
# docker pull rabbitmq:3.8.3-management

#创建映射数据卷目录
#mkdir rabbitmqcluster
#cd rabbitmqcluster/
#mkdir rabbitmq01 rabbitmq02 rabbitmq03

[root@localhost rabbitmqcluster]# pwd
/home/liuzeyu12a/rabbitmqcluster

#启动三个mq容器
#docker run -d --hostname rabbitmq01 --name rabbitmqCluster01 -v /home/liuzeyu12a/rabbitmqcluster/rabbitmq01:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' rabbitmq:3.8.3-management
#docker run -d --hostname rabbitmq02 --name rabbitmqCluster02 -v /home/liuzeyu12arabbitmqcluster/rabbitmq02:/var/lib/rabbitmq -p 15673:15672 -p 5673:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie'  --link rabbitmqCluster01:rabbitmq01 rabbitmq:3.8.3-management
#docker run -d --hostname rabbitmq03 --name rabbitmqCluster03 -v /home/liuzeyu12a/rabbitmqcluster/rabbitmq03:/var/lib/rabbitmq -p 15674:15672 -p 5674:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie'  --link rabbitmqCluster01:rabbitmq01 --link rabbitmqCluster02:rabbitmq02  rabbitmq:3.8.3-management

hostname 设置容器的主机名
RABBITMQ_ERLANG_COOKIE 节点认证作用,部署集成时 需要同步该值。

  1. 集群配置
# 进入主1容器内部
docker exec -it rabbitmqCluster01 bash
#重置内部配置
#rabbitmqctl stop_app
#rabbitmqctl reset
#rabbitmqctl start_app
#exit

#进入从2内部
#docker exec -it rabbitmqCluster02 bash
#rabbitmqctl stop_app
#rabbitmqctl reset
#rabbitmqctl join_cluster --ram rabbit@rabbitmq01
#rabbitmqctl start_app
#exit

#进入从3内部
#docker exec -it rabbitmqCluster03 bash
#rabbitmqctl stop_app
#rabbitmqctl reset
#rabbitmqctl join_cluster --ram rabbit@rabbitmq01
#rabbitmqctl start_app
#exit

配置完后可以进入:

搭建起来正常的就是上图。

我在配置集群的时候遇到电脑蓝屏重启了一下服务器,发现主服务器可以起来,从服务器也都可以起来,但是两台从服务器就是就是加不到主服务器中,报错信息:

root@rabbitmq03:/# rabbitmqctl join_cluster --ram rabbit@rabbitmq01
Clustering node rabbit@rabbitmq03 with rabbit@rabbitmq01
Error:
{
   :inconsistent_cluster, 'Node rabbit@rabbitmq01 thinks it\'s clustered with node rabbit@rabbitmq03, but rabbit@rabbitmq03 disagrees'}

网上查阅了很多资料,最简单粗暴的方式就是将映射数据卷目录删除掉,然后重新创建,重新启动服务器,就可以重新配置主从了。

  1. 访问服务器
主:http://192.168.2.163:15672/
从1:http://192.168.2.163:15673/
从2:http://192.168.2.163:15674/

生产者:


    @Test
    public void contextLoads() throws IOException, TimeoutException {
   
        Connection connection = RabbitMqUtils.getConnect();
        Channel channel = connection.createChannel();
        //4.设置管道参数
        //参数1:通道名称 参数2:mq服务重启,是否持久化 参数3:是否独占队列(一般为否) 参数5:是否消费完成 自动删除 参数6:其它额外属性
        channel.queueDeclare("test",true,false,false,null);

        //发布消息
        //参数1:交换机 参数2:队列名称 参数3:传递消息的额外设置 参数4:消息内容
        channel.basicPublish("","test", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello liuzeyu12a".getBytes());

        //3.释放资源
        RabbitMqUtils.close(channel,connection);
    }

向主节点发送一个消息后:

从1

从2

消费者:

    public static void main(String[] args) throws IOException {
   

        //1.创建连接
        Connection connection = RabbitMqUtils.getConnect();
        Channel channel = connection.createChannel();
        //2.设置通道参数
        channel.queueDeclare("test",true,false,false,null);

        //3.消费消息
        channel.basicConsume("test", true, new DefaultConsumer(channel){
   
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
   
                //消费的消息在body中
                System.out.println("消费的消息:" + new String(body).toString());
            }
        });
    }

消费掉消息后,各个服务器队列中消息也会同步被消费。

现在我们重新向主服务器发送一条消息,然后down掉它。

[root@localhost rabbitmqcluster]# docker exec -it rabbitmqCluster01 bash
root@rabbitmq01:/# rabbitmqctl stop_app
Stopping rabbit application on node rabbit@rabbitmq01 ...
root@rabbitmq01:/# 

此时从服务器

最后去尝试消费消息

显然,此时从服务器连接从主服务器取的消息,因此消息消费失败,这也就验证了普通集群无法高可用的现象。

8.2 RabbitMQ镜像集群


镜像集群的工作原理主要还是镜像发挥的作用,在集群中的服务器配置了镜像,主从的镜像就会同步。队列和消息也会同步到各个镜像中,当主节点宕机后,主从就会发生切换,从节点变成主节点,从而实现高可用。

LVS和HaProxy都是防止服务器宕机导致服务不可用,做的负载均衡策略。

#策略说明
rabbitmqctl set_policy [-p <vhost>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern>  <definition>

	-p <vhost>:可选参数,正对虚拟主机的队列进行设置
	name:策略名称
	Pattern:队列的匹配模式(正则)
	Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
		ha-mode:表示镜像队列的模式有(all/exactly/nodes)
			all:集群中所有的节点进行镜像
			exactly:指定个数的节点上进行镜像,节点个数由ha-params决定
			nodes:指定节点上进行镜像,节点名称由ha-params指定
		ha-params:用来配置ha-mode的参数
		ha-sync-mode:进行队列中的消息的同步方式,有效值automatic和manual,一般配置automatic
		priority:可选参数,优先级
			

查看当下的服务器集群从是否配置策略

root@rabbitmq01:/# rabbitmqctl list_policies
Listing policies for vhost "/" ...
root@rabbitmq01:/#

配置策略

向主节点发送消息后(也可以先配置策略再创建队列):

root@rabbitmq01:/# rabbitmqctl set_policy ha-all '^test' '{"ha-mode":"all","ha-sync-mode":"automatic"}'
Setting policy "ha-all" for pattern "^test" to "{"ha-mode":"all","ha-sync-mode":"automatic"}" with priority "0" for vhost "/" ...
root@rabbitmq01:/# rabbitmqctl list_policies
Listing policies for vhost "/" ...
vhost	name	pattern	apply-to	definition	priority
/	ha-all	^test	all	{
   "ha-mode":"all","ha-sync-mode":"automatic"}	0
root@rabbitmq01:/# 

只有test开头的队列才可以镜像自动同步。

测试现象:

发现并没有生效,经过排查发现没有指定虚拟主机配置策略,所有改主机下的队列就没有被匹配到。

重新配置策略:

root@rabbitmq01:/# rabbitmqctl set_policy -p '/ems' ha-all '^test' '{"ha-mode":"all","ha-sync-mode":"automatic"}'
Setting policy "ha-all" for pattern "^test" to "{"ha-mode":"all","ha-sync-mode":"automatic"}" with priority "0" for vhost "/ems" ...
root@rabbitmq01:/# rabbitmqctl list_policies
Listing policies for vhost "/" ...

root@rabbitmq01:/# 

主:

从1:

从2

现在我们down掉主节点!
从1

变成了主节点,并且状态还是up的

从2

尝试消费一下从2服务器

消费者要记得修改端口号,否则会获取连接失败。

此时的主节点消息也就被消费的了

如果此时把,之前down掉的主节点重新开启,它是否重新变成主节点?

root@rabbitmq01:/# rabbitmqctl start_app
Starting node rabbit@rabbitmq01 ...
 completed with 3 plugins.
root@rabbitmq01:/#

可见它是不可能再变成主节点了

清除策略

root@rabbitmq01:/# rabbitmqctl clear_policy ha-all -p '/ems'
Clearing policy "ha-all" on vhost "/ems" ...
root@rabbitmq01:/# rabbitmqctl list_policies
Listing policies for vhost "/" ...

root@rabbitmq01:/# 

队列的状态也就恢复成之前的普通集群样子