准备工作
RabbitMQMQ的简介和安装
MQ(Message Queue) 意为消息队列,运用的是生产者和消费者的设计模型,借用队列的数据结构:先进先出的特点,生产者可以不断的向队列中生产消息,消费者则可以在队列的另一端不断的消费消息,并且生产和消费的过程都是异步进行的,而且只关心消息的生产和接收,没有其它的业务的侵入,可以轻松实现系统的解耦,所以MQ又有一个别名叫消息中间件
。
通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据同学来实现分布式系统的集成。
市面上常用的MQ的区别:
MQ | 特点 |
---|---|
ActiveMQ | Apache出品,最流行,纯Java程序,运行需要Java虚拟机支持 |
Kafka | 开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务 |
RocketMQ | RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点,RocketMQ是对Kafka的优化 |
RabbitMQ | Erlang语言开发的消息队列模型,基于AMQP协议,主要特征是面向消息,队列,路由:包括点对点和消息订阅,稳定,安全并且可靠。 |
AMQP协议模型:
学过计算机网络的同学应该看的会很明白,队列类似一个路由器连接交换机的操作,交换机上面可以连接多个队列,生产者就可以指定交接机上面的队列向指定的队列发送消息,也可以用广播的方式发送消息到各个队列中,只要是接入该队列的消费者,就可以消费该消息。
安装:
链接:https://pan.baidu.com/s/1NKYymTioemob2FAK9z_z6g
提取码:d8kz
复制这段内容后打开百度网盘手机App,操作更方便哦
下载上面安装包,然后在自己服务器上按照下面步骤安装MQ
# 1.将rabbitmq安装包上传到linux系统中
erlang-22.0.7-1.el7.x86_64.rpm
rabbitmq-server-3.7.18-1.el7.noarch.rpm
# 2.安装Erlang依赖包
rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm
# 3.安装RabbitMQ安装包(需要联网)
yum install -y rabbitmq-server-3.7.18-1.el7.noarch.rpm
注意:默认安装完成后配置文件模板在:/usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example目录中,需要
将配置文件复制到/etc/rabbitmq/目录中,并修改名称为rabbitmq.config
# 4.复制配置文件
cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
# 5.查看配置文件位置
ls /etc/rabbitmq/rabbitmq.config
# 6.修改配置文件(参见下图:)
vim /etc/rabbitmq/rabbitmq.config
然后将61行的前面两个%%
和尾部的,
去掉
最后启动MQ的Web插件支持
# 7.执行如下命令,启动rabbitmq中的插件管理
rabbitmq-plugins enable rabbitmq_management
出现如下说明:
Enabling plugins on node rabbit@localhost:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@localhost...
The following plugins have been enabled:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
set 3 plugins.
Offline change; changes will take effect at broker restart.
# 8.启动RabbitMQ的服务
systemctl start rabbitmq-server
systemctl restart rabbitmq-server
systemctl stop rabbitmq-server
# 9.查看服务状态(见下图:)
systemctl status rabbitmq-server
● rabbitmq-server.service - RabbitMQ broker
Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled)
Active: active (running) since 三 2019-09-25 22:26:35 CST; 7s ago
Main PID: 2904 (beam.smp)
Status: "Initialized"
CGroup: /system.slice/rabbitmq-server.service
├─2904 /usr/lib64/erlang/erts-10.4.4/bin/beam.smp -W w -A 64 -MBas ageffcbf -MHas ageffcbf -
MBlmbcs...
├─3220 erl_child_setup 32768
├─3243 inet_gethost 4
└─3244 inet_gethost 4
.........
访问服务器地址:http://IP地址:15672/
RabbitMQ消息模型
登录官网可以看到:RabbitMq的消息模型
1. “Hello World!” 直连模式
这个是最简单的消息队列模型,其中生产者和消费者是一一对应的。
消息队列为红色部分,类似于一个邮箱,P就类似与一个邮递员,可以往邮箱中传递邮件,邮箱充当邮件消息的缓冲,C就类似于我们收件人,可以从邮箱中去出邮件。
步骤1:导入MQ相关Maven依赖:
<!-- rabbit mq相关依赖-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>
步骤2:登录rabbitmq服务器,建立队列
步骤3:生产者代码:
@SpringBootTest
class Rabbitmq01ApplicationTests {
//mq的生产者使用单元测试类
@Test
void contextLoads() throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.设置远程rabbitmq-server主机地址
connectionFactory.setHost("192.168.2.163");
//3.设置访问rabbitmq-server的用户名和密码 端口
connectionFactory.setUsername("liuzeyu12a");
connectionFactory.setPassword("809080");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/ems");
//3.创建channel通道
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4.设置管道参数
//参数1:通道名称 参数2:mq服务重启,是否持久化 参数3:是否独占队列(一般为否) 参数5:是否消费完成 自动删除 参数6:其它额外属性
channel.queueDeclare("test",false,false,false,null);
//5.发布消息
//参数1:交换机 参数2:队列名称 参数3:传递消息的额外设置 参数4:消息内容
channel.basicPublish("","test", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello liuzeyu12a".getBytes());
channel.close();
connection.close();
}
}
步骤4:消费者代码
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.设置远程rabbitmq-server主机地址
connectionFactory.setHost("192.168.2.163");
connectionFactory.setVirtualHost("/ems");
connectionFactory.setPort(5672);
connectionFactory.setUsername("liuzeyu12a");
connectionFactory.setPassword("809080");
//3.创建channel通道
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4.设置通道参数
channel.queueDeclare("test",false,false,false,null);
//消费消息
channel.basicConsume("test", true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//消费的消息在body中
System.out.println("消费的消息:" + new String(body).toString());
}
});
}
观察现象:
先运行生产者后,在队列中我们会看到一条待消费的消息
再次运行消费者
看到存到消息队列中的消息被打印出来了,也就是被消费了,再次查看队列的消息
需要注意的有:
- 生产者和消费者建立通道的时候,参数必须一一对应
- 消费者一般是没有去关闭连接的,因为它要随时监听队列中是否有消息,紧接着消费它
- 如果想要看到消费者打印消息在控制台,就要避免将消费者写在
@Test
中使用,原因是springboot单元测试不支持多线程测试,回调函数可能还没调用到主线程已经结束。
1.1 工具类封装
看到生产者和消费者的代码出现很多的冗余,封装一下代码!
//类似与JDBC
public class RabbitMqUtils {
private static ConnectionFactory connectionFactory;
static {
//1.创建连接工厂
connectionFactory = new ConnectionFactory();
//2.设置远程rabbitmq-server主机地址
connectionFactory.setHost("192.168.2.163");
//3.设置访问rabbitmq-server的用户名和密码 端口
connectionFactory.setUsername("liuzeyu12a");
connectionFactory.setPassword("809080");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/ems");
}
//获取连接
public static Connection getConnect() {
try {
return connectionFactory.newConnection();
}catch (Exception e){
e.printStackTrace();
}
return null;
}
public static void close(Channel channel,Connection connection){
try {
if(channel != null){
channel.close();
}
if(connection != null){
connection.close();
}
}catch (Exception e){
e.printStackTrace();
}
}
}
使用工具类重新改造生产者:
@SpringBootTest
class Rabbitmq01ApplicationTests {
//mq的生产者使用单元测试类
@Test
void contextLoads() throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnect();
Channel channel = connection.createChannel();
//4.设置管道参数
//参数1:通道名称 参数2:mq服务重启,是否持久化 参数3:是否独占队列(一般为否) 参数5:是否消费完成 自动删除 参数6:其它额外属性
channel.queueDeclare("test",false,false,false,null);
//发布消息
//参数1:交换机 参数2:队列名称 参数3:传递消息的额外设置 参数4:消息内容
channel.basicPublish("","test", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello liuzeyu12a".getBytes());
//3.释放资源
RabbitMqUtils.close(channel,connection);
}
}
使用工具类重新改造消费者:
//监听队列消费消息
public class Consumer {
public static void main(String[] args) throws IOException {
//1.创建连接
Connection connection = RabbitMqUtils.getConnect();
Channel channel = connection.createChannel();
//2.设置通道参数
channel.queueDeclare("test",false,false,false,null);
//3.消费消息
channel.basicConsume("test", true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//消费的消息在body中
System.out.println("消费的消息:" + new String(body).toString());
}
});
}
}
1.2 细节
- 持久化
如果我们的生产者在队列中产生了消息,消费者还没消费,rabbitmq服务器重启了,消息和队列就会被清空,如何避免这一情况呢?
简单解决办法有两个
将之前的队列删掉,用代码来生成新的队列,但要注意的是将channel.queueDeclare
的第二个参数设置成true,表示该队列支持持久化
//mq的生产者使用单元测试类
@Test
void contextLoads() throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnect();
Channel channel = connection.createChannel();
//4.设置管道参数
//参数1:通道名称 参数2:mq服务重启,是否持久化 参数3:是否独占队列(一般为否) 参数5:是否消费完成 自动删除 参数6:其它额外属性
channel.queueDeclare("test",true,false,false,null);
//发布消息
//参数1:交换机 参数2:队列名称 参数3:传递消息的额外设置 参数4:消息内容
channel.basicPublish("","test", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello liuzeyu12a".getBytes());
//3.释放资源
RabbitMqUtils.close(channel,connection);
}
还有一个方法就是直接用官网提供的界面的创建队列,并且选择持久化
配置完消息队列的持久化了,测试一下重启[root@localhost ~]# systemctl restart rabbitmq-server
发现队列虽然持久化了,但是消息并没有被持久化,这样看起来也毫无意义了,所以我们也可以消息持久化。
@Test
void contextLoads() throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnect();
Channel channel = connection.createChannel();
//4.设置管道参数
//参数1:通道名称 参数2:mq服务重启,是否持久化 参数3:是否独占队列(一般为否) 参数5:是否消费完成 自动删除 参数6:其它额外属性
channel.queueDeclare("test",true,false,false,null);
//发布消息
//参数1:交换机 参数2:队列名称 参数3:传递消息的额外设置 参数4:消息内容
channel.basicPublish("","test", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello liuzeyu12a".getBytes());
//3.释放资源
RabbitMqUtils.close(channel,connection);
}
channel.basicPublish()
的第三个参数设置为MessageProperties.PERSISTENT_TEXT_PLAIN,表示的是持久化消息。
channel.basicPublish()
第三个参数是是否独占队列,一般我们都设置为false表示不同通道都可以向队列中发送消息,不独占。
channel.basicPublish()
还有第四个参数是是否在消费者消费完之后,删除队列,看具体的业务场景。
2. work Queue
任务队列跟第一种的直连相比,会多出多个消费者,这种场景一般出现在,生产者的生产消息速度 > 消费者的消费速度,如果没有多个消费者一起消费队列中的消息,会引起队列的堵塞,消息的传递的也会不及时。
2.1 轮询
实例:
- 生产者:
//生产者
public class Provider {
public static void main(String[] args) throws IOException {
//1获取连接
Connection connect = RabbitMqUtils.getConnect();
//2.创建通道
Channel channel = connect.createChannel();
//3.设置通道参数
channel.queueDeclare("test",true,false,false,null);
//4.发送消息
for (int i = 1; i <= 20; i++) {
channel.basicPublish("", "test",MessageProperties.PERSISTENT_TEXT_PLAIN,(i +"->meg:").getBytes());
}
RabbitMqUtils.close(channel,connect);
}
}
- 消费者1:
//消费者1
public class Consumer1 {
public static void main(String[] args) throws IOException {
//1.创建连接
Connection connect = RabbitMqUtils.getConnect();
//2.创建通道
Channel channel = connect.createChannel();
//3.配置通道参数
channel.queueDeclare("test",true,false,false,null);
//
channel.basicConsume("test",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+new String(body));
}
});
}
}
- 消费者2:
//消费者2
public class Consumer2 {
public static void main(String[] args) throws IOException {
//1.获取连接
Connection connect = RabbitMqUtils.getConnect();
//2.获取通道
Channel channel = connect.createChannel();
//3.配置通道参数
channel.queueDeclare("test",true,false,false,null);
//4.接收消息
channel.basicConsume("test",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:"+new String(body));
}
});
}
}
为了测试的准确性,先把之前的队列清空。
依次运行消费者1,消费者2。
然后运行生产者->
观察结果消费者1运行结果:
消费者2:
发现消费者1和2是轮询从队列中消费,为了进一步验证这个结果,可以染消费者2的消费速度放慢
public class Consumer2 {
public static void main(String[] args) throws IOException {
//1.获取连接
Connection connect = RabbitMqUtils.getConnect();
//2.获取通道
Channel channel = connect.createChannel();
//3.配置通道参数
channel.queueDeclare("test",true,false,false,null);
//4.接收消息
channel.basicConsume("test",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
System.out.println("消费者2:"+new String(body));
}
});
}
}
结果仍然是依次交替消费数据,与我们的猜想一致。
2.2 消息自动确认机制
消息自动确认机制 指的是消费者消费完消息之后向rabbitmq发送确认消息消费的一种机制。
我们的任务消息队列,消费者默认是轮询消费队列中的消息,无论消费者他们各自的消费速度如何,都将分配同样个数的消息。
但是正常情况下,我们又希望消费快的能够多消费一点消息,消费慢的少消费一点,达到能者多劳的目的。
要想实现这一目的,我们就必须关闭掉消息的自动确认机制。
生产者:
//生产者
public class Provider {
public static void main(String[] args) throws IOException {
//1获取连接
Connection connect = RabbitMqUtils.getConnect();
//2.创建通道
Channel channel = connect.createChannel();
//3.设置通道参数
channel.queueDeclare("test",true,false,false,null);
//4.发送消息
for (int i = 1; i <= 20; i++) {
channel.basicPublish("", "test",MessageProperties.PERSISTENT_TEXT_PLAIN,(i +"->meg:").getBytes());
}
RabbitMqUtils.close(channel,connect);
}
}
消费者1:
//消费者1
public class Consumer1 {
public static void main(String[] args) throws IOException {
//1.创建连接
Connection connect = RabbitMqUtils.getConnect();
//2.创建通道
Channel channel = connect.createChannel();
//3.配置通道参数
channel.queueDeclare("test",true,false,false,null);
//
channel.basicQos(1); //每次获取一条消息
channel.basicConsume("test",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false); //手动确认
}
});
}
}
消费者2:
//消费者2
public class Consumer2 {
public static void main(String[] args) throws IOException, InterruptedException {
//1.获取连接
Connection connect = RabbitMqUtils.getConnect();
//2.获取通道
Channel channel = connect.createChannel();
//3.配置通道参数
channel.queueDeclare("test",true,false,false,null);
channel.basicQos(1);//每次获取一条消息
//4.接收消息
channel.basicConsume("test",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
System.out.println("消费者2:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false); //手动确认消费
}
});
}
}
现象:
做到了能者多劳!
<mark>自动确认机制存在的问题:</mark>
可以避免消息丢失的问题,如果消费者1和消费者2同时在消费消息,默认情况下用的是自动确认,如果消费者1宕机了,消息才消费到一半,但是队列已经得到它的自动确认标志,认为已经消费完了,这就会造成消息丢失。
<mark>关闭自动确认机制有一个很好的好处是:</mark>
但是关闭了自动确认机制就会等到我们具体的业务执行完,我们可以手动提交消息消费标志给mq服务器,告诉它消息已经执行完,这中间执行我们业务即使宕机,消息也不会丢失,因为我们是在最后告诉mq消息被消费完了,具体业务并没有执行完,消息还在,服务器我们就可以再利用消息执行相关具体业务。
3. 广播 fanout模式
这是一种广播的消息模型,生产者生产的消息,要想发送给多个消费者,这中间要借助于交换机,多个交换机与队列进行绑定,那么每个队列中都会接收到生产者发送的消息,与队列绑定的消费者就能接收到信息。
这个和计算机网络中的交换机实现的是相同的原理,多个路由器与交换机绑定,那么才会实现广播路由的分发。
编码实现:
//生产者广播
public class Provider {
public static void main(String[] args) throws IOException {
//创建连接
Connection connect = RabbitMqUtils.getConnect();
//获取通道
Channel channel = connect.createChannel();
//通道绑定交换机 参数1:交换机名称 参数2:类型为广播
channel.exchangeDeclare("log","fanout");
//发送消息到交换
channel.basicPublish("log","",null,"你好交换机".getBytes());
RabbitMqUtils.close(channel,connect);
}
}
观察交换机
运行生产者代码:
多出一个log的交换机,使用的是广播的类型 fanout
我们建立多个消费者,绑定这台交换机,测试能否收到消息。
消费者1:
//消费者1
public class Consumer1 {
public static void main(String[] args) throws IOException {
//1.创建连接
Connection connect = RabbitMqUtils.getConnect();
//2.创建通道
Channel channel = connect.createChannel();
//3.绑定交换机
channel.exchangeDeclare("log","fanout");
//4.创建临时队列
String queue = channel.queueDeclare().getQueue();
//5.将临时队列绑定交换机
channel.queueBind(queue,"log","");
//6.接收消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+ new String(body));
}
});
}
}
消费者2:
//消费者2
public class Consumer2 {
public static void main(String[] args) throws IOException {
//1.创建连接
Connection connect = RabbitMqUtils.getConnect();
//2.创建通道
Channel channel = connect.createChannel();
//3.绑定交换机
channel.exchangeDeclare("log","fanout");
//4.创建临时队列
String queue = channel.queueDeclare().getQueue();
//5.将临时队列绑定交换机
channel.queueBind(queue,"log","");
//6.接收消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:"+ new String(body));
}
});
}
}
优先启动消费者1,2,生产者发送消息
消费者生成了两个临时队列
接着运行生产者
可以发现做到了一个消息多个消费者消费的目的,适用的场景:登录/注册这一举动可以送积分,送奖品,发布订阅消息。
当停止消费者,临时队列也跟着消失
4. 直连路由
直连路由路由的工作模式:Receiving messages selectively
,翻译过来也就是,消费者可以选择性的接收到生产者的消息。
这相比广播的方式会灵活点,广播就是不要消费要不要,只要和消费者的队列和交换机绑定,那么消费者就一定会接收到消息。
直连路由的业务场景可以用于日志系统,如上图,可以把C1比作磁盘,C2比作控制台,我们系统产生的日志如果error类型要存放到磁盘的同时在控制台打印,而info,warn,error都要在控制台打印,这个时候广播的方式已经控制不了了,就要适用新的消息模型,直连路由的方式。
生产者:
//具有路由形式的直连
public class Provider {
public static void main(String[] args) throws IOException {
//创建连接
Connection connect = RabbitMqUtils.getConnect();
//机获取通道
Channel channel = connect.createChannel();
//配置通道配置
channel.exchangeDeclare("routing_direct","direct");
//获取临时队列
String queue = channel.queueDeclare().getQueue();
//发送消息
String routingKey1 = "info";
channel.basicPublish("routing_direct",routingKey1,
null,("这是一种直连路由,routingKey:"+routingKey1).getBytes());
String routingKey2 = "error";
channel.basicPublish("routing_direct",routingKey2,
null,("这是一种直连路由,routingKey:"+routingKey2).getBytes());
String routingKey3 = "warning";
channel.basicPublish("routing_direct",routingKey3,
null,("这是一种直连路由,routingKey:"+routingKey3).getBytes());
//是否资源
RabbitMqUtils.close(channel,connect);
}
}
消费者C1:
//具有路由形式的直连
public class Consumer1 {
public static void main(String[] args) throws IOException {
//获取连接
Connection connect = RabbitMqUtils.getConnect();
//获取通道
Channel channel = connect.createChannel();
//配置通道
channel.exchangeDeclare("routing_direct","direct");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
//绑定交换
channel.queueBind(queue,"routing_direct","error");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+ new String(body));
}
});
}
}
消费者C2:
//具有路由形式的直连
public class Consumer2 {
public static void main(String[] args) throws IOException {
//获取连接
Connection connect = RabbitMqUtils.getConnect();
//获取通道
Channel channel = connect.createChannel();
//配置通道
channel.exchangeDeclare("routing_direct","direct");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
//绑定交换
channel.queueBind(queue,"routing_direct","info");
channel.queueBind(queue,"routing_direct","warning");
channel.queueBind(queue,"routing_direct","error");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+ new String(body));
}
});
}
}
输出:
通过现象也可以看出,我们可以通过交换机绑定的路由key 和队列与交换机绑定的路由key产生对应关系,来决定消息的指向性发送。
5. 动态路由
动态路由是直连路由的升级版,更为灵活的的控制路由的分发,通过通配符来控制路由的下发。
通配符有两种:
*:表示user.*代表的是匹配user.[一个单词],user.findAll,userdelete都会被匹配上
#:表示user.#代表的是匹配user.[0个单词/多个单词],user,user.findAll,userdelete,user.findAll.***都会被匹配上
生产者:
//动态路由 Provider
public class Provider {
public static void main(String[] args) throws IOException {
//创建连接
Connection connect = RabbitMqUtils.getConnect();
//获取通道
Channel channel = connect.createChannel();
//发送消息
String routingKey = "user.findAll";
channel.basicPublish("topics",routingKey,null,("动态路由,通配符:"+routingKey).getBytes());
//释放资源
RabbitMqUtils.close(channel,connect);
}
}
消费者1:
//动态路由 消费者1
public class Consumer1 {
public static void main(String[] args) throws IOException {
//创建连接
Connection connect = RabbitMqUtils.getConnect();
//获取通道
Channel channel = connect.createChannel();
//创建临时队列
String queue = channel.queueDeclare().getQueue();
//绑定交换机
channel.exchangeDeclare("topics","topic");
channel.queueBind(queue,"topics","user.*");
//接收消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:" + new String(body));
}
});
}
}
消费者1会接收到消息,因为它匹配到了生产者的key,user.findAll,消费者的user.*匹配到一个单词了,将user.*改为user.#同样可以匹配到。
如果生产者绑定的是
String routingKey = "user.findAll.***";
消费者则必须适用
//绑定交换机
channel.exchangeDeclare("topics","topic");
channel.queueBind(queue,"topics","user.#");
才可以匹配上。
6. SpringBoot整合RabbitMQ
实际开发中,springboot是最常用的开发平台,我们也要学会在SpringBoot平台上适用RabbitMQ
第一步:导入相关依赖(建议Springboot适用2.1.x)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
第二步:创建生产者测试类
@SpringBootTest(classes = Rabbitmq01Application.class)
@RunWith(SpringRunner.class)
public class SpringbootMQ {
@Autowired
private RabbitTemplate rabbitTemplate;
}
6.1 HELLO WORLD
其中RabbitTemplate 适用操作MQ的一套模板,可以简化我们操作MQ
生产者:
//HELLO WORLD
@Test
public void hello(){
//参数1:队列名称 参数2:要发送队列的内容
rabbitTemplate.convertAndSend("boot-mq","springboot hello world");
}
当我们运行完生产者后,并不会在队列中生成队列,因为使用rabbitTemplate创建队列并发送消息,此时并没有消费者绑定改队列,所以我们需要一个消费者:
@Component
public class Consumer {
@RabbitHandler
@RabbitListener(queuesToDeclare = @Queue("boot-mq")) //也可以用在类上
public void receive1(String message){
System.out.println("message:"+message);
}
}
最后我们运行一下生产者
消息被消费的信息打印在了控制台。
6.2 work Queue
生产者:
//work Queue
@Test
public void work(){
//参数1:队列名称 参数2:要发送队列的内容
rabbitTemplate.convertAndSend("work","springboot hello work Queue");
}
消费者:
@Component
public class Consumer1 {
// @RabbitHandler
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive1(String message){
System.out.println("message1:"+message);
}
// @RabbitHandler
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive2(String message){
System.out.println("message2:"+message);
}
}
生产运行完之后发现,消息会被消费者1或2任意一个消费,类似,我们生产者创建多个消息
@Test
public void work(){
//参数1:队列名称 参数2:要发送队列的内容
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work","springboot hello work Queue");
}
}
再次运行
多个消费者还是进行轮询的消费,符合我们预期的场景。
6.3. 广播 fanout模式
生产者:
//fanout
@Test
public void fanout(){
rabbitTemplate.convertAndSend("logs","","springboot fanout Queue");
}
消费者:
@Component
public class FanoutConsumer {
//创建临时队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue, //创建临时队列
exchange = @Exchange(name = "logs",type = "fanout")
))
public void receive1(String message){
System.out.println("message1:"+message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue, //创建临时队列
exchange = @Exchange(name = "logs",type = "fanout")
))
public void receive2(String message){
System.out.println("message2:"+message);
}
}
运行生产者,发现消息同时被两个消费者消费了。
6.4 直连路由
生产者:
//direct
@Test
public void direct(){
rabbitTemplate.convertAndSend("direct","info","springboot direct Queue");
}
发送key = info的消息
消费者:
@Component
public class ConsumerDirect {
@RabbitListener(bindings = @QueueBinding(
value = @Queue, //临时队列
key = {
"error","warn","info"},
exchange = @Exchange(name = "direct",type = "direct")
))
public void receive1(String message){
System.out.println("message1:"+message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue, //临时队列
key = {
"error"},
exchange = @Exchange(name = "direct",type = "direct")
))
public void receive2(String message){
System.out.println("message2:"+message);
}
}
此时只会有消费者1才能消费消息
6.5 动态路由
生产者:
@Test
public void topics(){
rabbitTemplate.convertAndSend("topics","product.save","springboot topics Queue");
}
消费者的key以product开头都可以匹配到
消费者
@Component
public class ConsumerTopics {
@RabbitListener(bindings = @QueueBinding(
value = @Queue, //临时队列
exchange = @Exchange(name = "topics",type = "topic"),
key = {
"user.*"}
))
public void receive1(String message){
System.out.println("message1:"+message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue, //临时队列
exchange = @Exchange(name="topics",type = "topic"),
key = {
"product.#"}
))
public void receive2(String message){
System.out.println("message2:"+message);
}
}
消费者2可以接收到消息
7. RabbitMQ应用场景
7.1 异步处理
场景:用户的注册:注册成功后发送短信通知用户
传统的实现方式有两种:串行
,并行
串行:
类似传统的单核CPU一样执行任务,效率很慢,而且环环相扣,程序之间的耦合度高
并行:
可以观察出,解耦的还不够彻底,因为我们的用户注册是为了使用,其实并不关心短信是否收到,所以我们的关心重点在于是否将用户的信息写入数据库,只要入库,用户就可以使用了。
MQ中间件解耦:
引入中间件后,由于将消息写入中间件的速度很快,甚至可以忽略不计,所以大大的减少了相应的时间,响应时间是串行的3倍,并行的2倍。
7.2 应用解耦
场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.
如果使用订单系统和库存系统直接通信,会造成一一种情况,就是如果库存系统宕机,那么订单系统的订单将会丢失,造成用户的利益损失。
引入消息队列
- 订单系统:会在用户下单后,订单系统会发送消息到队列中进行持久化处理,返回用户下单成功
- 库存系统:获取消息的队列的订单,进行库存操作,即使订单系统故障,也会保证消息的可靠传递,不会丢失。
7.3 流量削峰
场景:
秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
此时的中间件可以发挥以下作用:
- 控制活动人数,超过一定量的请求订单直接丢弃(所以你的秒杀都不一定抢得到)
- 可以缓解短时间内高流量压垮应用,应用程序按照自己的处理能力处理订单
8. RabbitMQ集群
集群环境会使用Docker来部署,顺便自己再复习点Docker知识,因为用Docker来部署集群用于学习真的很方便。
8.1 RabbitMQ普通集群
普通集群:以三台服务器为例,主Master节点,两个从Slave,这边的RabbitMQ普通集群并不支持高可用的场景,它的工作原理是:主Master的交换机会和从的从交换机同步,它们所绑定的队列也会进行同步,但是队列中的消息并不会同步,当我们的消费者要消费消息的时候,有两个从服务器向主服务器拿。一旦主宕机,从服务器的消息就可能丢失,而且并不会进行故障切换,从服务器不会变成主服务器,做不到高可用!
实验的场景需要搭建服务器集群,一般的物理机同时运行三个服务器会卡,于是可以在docker上面部署集群。
可以参考这位大哥的集群部署教程:Docker环境RabbitMQ集群部署
搭建几个基本步骤:
- docker安装:
# step 1: 安装必要的一些系统工具
sudo yum install -y yum-utils
# Step 2: 添加软件源信息
sudo yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
# Step 3: 更新并安装 Docker-CE(社区版)
sudo yum makecache fast
sudo yum -y install docker-ce
# Step 4: 开启Docker服务
sudo service docker start
- rabbitmq安装
#拉取镜像
# docker pull rabbitmq:3.8.3-management
#创建映射数据卷目录
#mkdir rabbitmqcluster
#cd rabbitmqcluster/
#mkdir rabbitmq01 rabbitmq02 rabbitmq03
[root@localhost rabbitmqcluster]# pwd
/home/liuzeyu12a/rabbitmqcluster
#启动三个mq容器
#docker run -d --hostname rabbitmq01 --name rabbitmqCluster01 -v /home/liuzeyu12a/rabbitmqcluster/rabbitmq01:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' rabbitmq:3.8.3-management
#docker run -d --hostname rabbitmq02 --name rabbitmqCluster02 -v /home/liuzeyu12arabbitmqcluster/rabbitmq02:/var/lib/rabbitmq -p 15673:15672 -p 5673:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' --link rabbitmqCluster01:rabbitmq01 rabbitmq:3.8.3-management
#docker run -d --hostname rabbitmq03 --name rabbitmqCluster03 -v /home/liuzeyu12a/rabbitmqcluster/rabbitmq03:/var/lib/rabbitmq -p 15674:15672 -p 5674:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' --link rabbitmqCluster01:rabbitmq01 --link rabbitmqCluster02:rabbitmq02 rabbitmq:3.8.3-management
hostname 设置容器的主机名
RABBITMQ_ERLANG_COOKIE 节点认证作用,部署集成时 需要同步该值。
- 集群配置
# 进入主1容器内部
docker exec -it rabbitmqCluster01 bash
#重置内部配置
#rabbitmqctl stop_app
#rabbitmqctl reset
#rabbitmqctl start_app
#exit
#进入从2内部
#docker exec -it rabbitmqCluster02 bash
#rabbitmqctl stop_app
#rabbitmqctl reset
#rabbitmqctl join_cluster --ram rabbit@rabbitmq01
#rabbitmqctl start_app
#exit
#进入从3内部
#docker exec -it rabbitmqCluster03 bash
#rabbitmqctl stop_app
#rabbitmqctl reset
#rabbitmqctl join_cluster --ram rabbit@rabbitmq01
#rabbitmqctl start_app
#exit
配置完后可以进入:
搭建起来正常的就是上图。
我在配置集群的时候遇到电脑蓝屏重启了一下服务器,发现主服务器可以起来,从服务器也都可以起来,但是两台从服务器就是就是加不到主服务器中,报错信息:
root@rabbitmq03:/# rabbitmqctl join_cluster --ram rabbit@rabbitmq01
Clustering node rabbit@rabbitmq03 with rabbit@rabbitmq01
Error:
{
:inconsistent_cluster, 'Node rabbit@rabbitmq01 thinks it\'s clustered with node rabbit@rabbitmq03, but rabbit@rabbitmq03 disagrees'}
网上查阅了很多资料,最简单粗暴的方式就是将映射数据卷目录删除掉,然后重新创建,重新启动服务器,就可以重新配置主从了。
- 访问服务器
主:http://192.168.2.163:15672/
从1:http://192.168.2.163:15673/
从2:http://192.168.2.163:15674/
生产者:
@Test
public void contextLoads() throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnect();
Channel channel = connection.createChannel();
//4.设置管道参数
//参数1:通道名称 参数2:mq服务重启,是否持久化 参数3:是否独占队列(一般为否) 参数5:是否消费完成 自动删除 参数6:其它额外属性
channel.queueDeclare("test",true,false,false,null);
//发布消息
//参数1:交换机 参数2:队列名称 参数3:传递消息的额外设置 参数4:消息内容
channel.basicPublish("","test", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello liuzeyu12a".getBytes());
//3.释放资源
RabbitMqUtils.close(channel,connection);
}
向主节点发送一个消息后:
从1
从2
消费者:
public static void main(String[] args) throws IOException {
//1.创建连接
Connection connection = RabbitMqUtils.getConnect();
Channel channel = connection.createChannel();
//2.设置通道参数
channel.queueDeclare("test",true,false,false,null);
//3.消费消息
channel.basicConsume("test", true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//消费的消息在body中
System.out.println("消费的消息:" + new String(body).toString());
}
});
}
消费掉消息后,各个服务器队列中消息也会同步被消费。
现在我们重新向主服务器发送一条消息,然后down掉它。
[root@localhost rabbitmqcluster]# docker exec -it rabbitmqCluster01 bash
root@rabbitmq01:/# rabbitmqctl stop_app
Stopping rabbit application on node rabbit@rabbitmq01 ...
root@rabbitmq01:/#
此时从服务器
最后去尝试消费消息
显然,此时从服务器连接从主服务器取的消息,因此消息消费失败,这也就验证了普通集群无法高可用的现象。
8.2 RabbitMQ镜像集群
镜像集群的工作原理主要还是镜像发挥的作用,在集群中的服务器配置了镜像,主从的镜像就会同步。队列和消息也会同步到各个镜像中,当主节点宕机后,主从就会发生切换,从节点变成主节点,从而实现高可用。
LVS和HaProxy都是防止服务器宕机导致服务不可用,做的负载均衡策略。
#策略说明
rabbitmqctl set_policy [-p <vhost>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern> <definition>
-p <vhost>:可选参数,正对虚拟主机的队列进行设置
name:策略名称
Pattern:队列的匹配模式(正则)
Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
ha-mode:表示镜像队列的模式有(all/exactly/nodes)
all:集群中所有的节点进行镜像
exactly:指定个数的节点上进行镜像,节点个数由ha-params决定
nodes:指定节点上进行镜像,节点名称由ha-params指定
ha-params:用来配置ha-mode的参数
ha-sync-mode:进行队列中的消息的同步方式,有效值automatic和manual,一般配置automatic
priority:可选参数,优先级
查看当下的服务器集群从是否配置策略
root@rabbitmq01:/# rabbitmqctl list_policies
Listing policies for vhost "/" ...
root@rabbitmq01:/#
配置策略
向主节点发送消息后(也可以先配置策略再创建队列):
root@rabbitmq01:/# rabbitmqctl set_policy ha-all '^test' '{"ha-mode":"all","ha-sync-mode":"automatic"}'
Setting policy "ha-all" for pattern "^test" to "{"ha-mode":"all","ha-sync-mode":"automatic"}" with priority "0" for vhost "/" ...
root@rabbitmq01:/# rabbitmqctl list_policies
Listing policies for vhost "/" ...
vhost name pattern apply-to definition priority
/ ha-all ^test all {
"ha-mode":"all","ha-sync-mode":"automatic"} 0
root@rabbitmq01:/#
只有test开头的队列才可以镜像自动同步。
测试现象:
发现并没有生效,经过排查发现没有指定虚拟主机配置策略,所有改主机下的队列就没有被匹配到。
重新配置策略:
root@rabbitmq01:/# rabbitmqctl set_policy -p '/ems' ha-all '^test' '{"ha-mode":"all","ha-sync-mode":"automatic"}'
Setting policy "ha-all" for pattern "^test" to "{"ha-mode":"all","ha-sync-mode":"automatic"}" with priority "0" for vhost "/ems" ...
root@rabbitmq01:/# rabbitmqctl list_policies
Listing policies for vhost "/" ...
root@rabbitmq01:/#
主:
从1:
从2
现在我们down掉主节点!
从1
变成了主节点,并且状态还是up的
从2
尝试消费一下从2服务器
消费者要记得修改端口号,否则会获取连接失败。
此时的主节点消息也就被消费的了
如果此时把,之前down掉的主节点重新开启,它是否重新变成主节点?
root@rabbitmq01:/# rabbitmqctl start_app
Starting node rabbit@rabbitmq01 ...
completed with 3 plugins.
root@rabbitmq01:/#
可见它是不可能再变成主节点了
清除策略
root@rabbitmq01:/# rabbitmqctl clear_policy ha-all -p '/ems'
Clearing policy "ha-all" on vhost "/ems" ...
root@rabbitmq01:/# rabbitmqctl list_policies
Listing policies for vhost "/" ...
root@rabbitmq01:/#
队列的状态也就恢复成之前的普通集群样子