R A B B I T M Q
- RabbitMQ 官网:https://www.rabbitmq.com/
- 【消息队列MQ】各类MQ比较:https://blog.csdn.net/sunxinhere/article/details/7968886
- https://blog.csdn.net/weixin_38305440/article/details/102810522
RabbitMQ 使用场景
# 服务解耦
假设有这样一个场景, 服务A
产生数据, 而 服务B,C,D
需要这些数据, 那么我们可以在 A服务
中直接调用 B,C,D服务
,把数据传递到下游服务即可
但是,随着我们的应用规模不断扩大,会有更多的服务需要A的数据,如果有几十甚至几百个下游服务,而且会不断变更,再加上还要考虑下游服务出错的情况,那么A服务中调用代码的维护会极为困难
这是由于服务之间耦合度过于紧密
再来考虑用 RabbitMQ
解耦的情况
A服务
只<mark>需要向消息服务器发送消息,而不用考虑谁需要这些数据;下游服务如果需要数据,自行从消息服务器订阅消息</mark> ,不再需要数据时则取消订阅即可
# 流量削峰
假设我们有一个应用,平时访问量是每秒300请求,我们用一台服务器即可轻松应对
而在高峰期,访问量瞬间翻了十倍,达到每秒3000次请求,那么单台服务器肯定无法应对,这时我们可以考虑增加到10台服务器,来分散访问压力
但如果这种瞬时高峰的情况每天只出现一次,每次只有半小时,那么<mark>我们10台服务器在多数时间都只分担每秒几十次请求,这样就有点浪费资源了</mark>
这种情况,我们就可以使用 RabbitMQ
来进行 <mark>流量削峰</mark>
<mark>高峰情况下,瞬间出现的大量请求数据,先发送到消息队列服务器,排队等待被处理,而我们的应用,可以慢慢的从消息队列接收请求数据进行处理,这样把数据处理时间拉长,以减轻瞬时压力</mark>
这是消息队列服务器非常典型的应用场景
这种情况,我们就可以使用RabbitMQ来进行流量削峰,高峰情况下,瞬间出现的大量请求数据,先发送到消息队列服务器,排队等待被处理,而我们的应用,可以慢慢的从消息队列接收请求数据进行处理,这样把数据处理时间拉长,以减轻瞬时压力
这是消息队列服务器非常典型的应用场景
# 异步调用
考虑定外卖支付成功的情况
- 支付后要发送支付成功的通知,
- 再寻找外卖小哥来进行配送,
而寻找外卖小哥的过程非常耗时,尤其是高峰期,可能要等待几十秒甚至更长
这样就造成整条调用链路响应非常缓慢
<mark>而如果我们引入 RabbitMQ
消息队列,订单数据可以发送到消息队列服务器,</mark>
<mark>那么调用链路也就可以到此结束,订单系统则可以立即得到响应,整条链路的响应时间只有200毫秒左右</mark>
<mark>寻找外卖小哥的应用可以以异步的方式从消息队列接收订单消息,再执行耗时的寻找操作</mark>
RabbitMQ 基本概念 (术语)
RabbitMQ
是一种消息中间件,<mark>用于处理来自客户端的异步消息</mark>。
- 服务端将要发送的消息放入到队列池中。
- 接收端可以根据
RabbitMQ
配置的转发机制接收服务端发来的消息。 RabbitMQ
依据指定的转发规则进行消息的转发、缓冲和持久化操作,
主要用在 多服务器间
或 单服务器的子系统间
进行通信,是分布式系统标准的配置。
RabbitMQ 流程图(下面一一介绍每一步)
# Exchange (交换)
接受生产者发送的消息,并根据 Binding
规则 <mark>将消息路由给服务器中的队列</mark>。
ExchangeType
决定了 Exchange
路由消息的行为。
在RabbitMQ中,ExchangeType常用的有三种。
direct
(直接塞进去)Fanout
(数目划分)Topic
(类型划分)
.
# Message Queue (消息队列)
我们发送给 RabbitMQ
的消息最后都会到达各种 queue
,并且存储在其中(如果路由找不到相应的 queue
则数据会丢失),等待消费者来取。
# Binding Key
它表示的是 Exchange
与 Message Queue
是通过 binding key
进行联系的,这个关系是固定。
# Routing Key
生产者在将消息发送给 Exchange
的时候,一般会指定一个 routing key
routing key
来 <mark>指定这个消息的路由规则</mark>。
这个 routing key
需要与 Exchange Type
及 binding key
联合使用才能生效,<mark>我们的生产者只需要通过指定routing key来决定消息流向哪里</mark>。
RabbitMQ 安装
在centos7上安装rabbitmq
<mark>下面介绍两种:</mark>
- <mark>wget 一个一个依赖包下载</mark>(<mark>体验过程很重要</mark>)
- <mark>yum 用 rabbitmq 官网的脚本</mark>,一键安装
先介绍前者
# 安装erlang语言库
erlang 官网 : https://www.erlang.org/
安装过程参考:RabbitMQ安装,安装erlang依赖,安装RabbitMQ
erlang教程 :W3C school Erlang 教程
RabbitMQ
使用了 Erlang
开发语言,<mark>Erlang是为电话交换机开发的语言,天生自带高并发光环,和高可用特性</mark>
## 找语言包
我们可以冲 rabbitmq 官方 找到 rabbitmq官方精简的Erlang语言包
- 0依赖rpm安装包
- Github - https://github.com/rabbitmq/erlang-rpm
2020年3月10日
## 安装(rpm方式)
这个链接 https://www.cnblogs.com/swyy/p/11582309.html
介绍了 <mark>yum方式安装</mark> 和 <mark>源码安装</mark>
下载 rlang 的 rpm 包
# 下载Erlang语言包
wget https://github.com/rabbitmq/erlang-rpm/releases/download/v21.2.6/erlang-21.2.6-1.el7.x86_64.rpm
这里 github 提供了三个下载的地方(下面提供最新地址 2020年3月10日)
- github-release :
https://github.com/rabbitmq/erlang-rpm/releases/download/v21.3.8.14/erlang-21.3.8.14-1.el7.x86_64.rpm
- packagecloud:
wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-21.3.8.14-1.el7.x86_64.rpm/download.rpm
- bintray
https://bintray.com/rabbitmq-erlang/rpm/download_file?file_path=erlang%2F21%2Fel%2F7%2Fx86_64%2Ferlang-21.3.8.14-1.el7.x86_64.rpm
下载好 .rpm 软件包。
接下来是安装 .rpm 软件包
# 安装Erlang
rpm -ivh erlang-21.2.6-1.el7.x86_64.rpm --force --nodeps
博客园 Linux rpm命令详解
菜鸟教程 Linux rpm命令
-i:显示套件的相关信息;
-v:显示指令执行过程;
-h或–hash:套件安装时列出标记;
–force 忽略报错,强行安***r> –nodeps 不验证套件档的相互关联性。
查看是否安装成功
erl -version
出现“Erlang (SMP,ASYNC_THREADS,HIPE) (BEAM) emulator version 10.5”证明安装成功
# 安装 socat 依赖
rabbitmq
依赖 socat
socat
的官方网站: http://www.dest-unreach.org/socat/ 。
官网极度简洁。。。
刚进入都怀疑我关了 css
## socat简介
socat
是一个 <mark>多功能的网络工具</mark>,名字来由是” Socket CAT
”,可以看作是 netcat
的N
倍加强版
socat
是一个<mark>两个独立数据通道之间的双向数据传输的继电器</mark>。这些数据通道包含文件、管道、设备(终端或调制解调器等)、插座(Unix,IP4,IP6 - raw,UDP,TCP)、SSL、SOCKS4客户端或***CONNECT。
Socat
<mark>支持广播和多播、抽象Unix sockets、Linux tun/tap、GNU readline 和 PTY</mark>。它提供了分叉、记录和进程间通信的不同模式。多个选项可用于调整socat和其渠道,Socat可以作为TCP中继(一次性或守护进程),作为一个守护进程基于socksifier,作为一个shell Unix套接字接口,作为IP6的继电器,或面向TCP的程序重定向到一个串行线。
socat
的主要特点就是在两个数据流之间建立通道;
<mark>且支持众多协议和链接方式:ip, tcp, udp, ipv6, pipe,exec,system,open,proxy,openssl,socket等</mark>。
## socat依赖包
- https://pkgs.org/download/socat
- https://centos.pkgs.org/7/centos-x86_64/socat-1.7.3.2-2.el7.x86_64.rpm.html
下载最新的
# 下载 socat rpm 包
wget http://mirror.centos.org/altarch/7/os/armhfp/Packages/socat-1.7.3.2-2.el7.armv7hl.rpm
# 安装 socat 依赖包
rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
检查是否安装成功
socat -V
# 安装 RabbitMQ-server
## 找 rpm 包
要么 在 pkgs.org 上继续找:https://pkgs.org/download/rabbitmq-server
https://centos.pkgs.org/7/epel-x86_64/rabbitmq-server-3.3.5-34.el7.noarch.rpm.html
wget https://download-ib01.fedoraproject.org/pub/epel/7/x86_64/Packages/r/rabbitmq-server-3.3.5-34.el7.noarch.rpm
要么在 到 rabbitmq 官网下载
https://www.rabbitmq.com/install-rpm.html
https://packagecloud.io/rabbitmq/rabbitmq-server/
wget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.3-1.el7.noarch.rpm/download.rpm
这里官网的最新 用 官网的 (2020年3月10日)
然后依然是 安装
rpm -ivh rabbitmq-server-3.8.3-1.el7.noarch.rpm
最后检验是否安装成功
rabbitmqctl version
CTL是英文ConTroL(控制)的缩写
# Yum 安装
用 yum 安装,先配好 阿里云国内镜像啊 ,不然很慢 https://editor.csdn.net/md/?articleId=103931092
## 需要手动安装的依赖
https://www.rabbitmq.com/install-rpm.html#package-dependencies
- logrotate 貌似不用
### erlang
https://www.rabbitmq.com/install-rpm.html#install-erlang-from-epel-repository
安装必要环境 erlang
yum install erlang -y
检查是否安装
erl -version
### socat
yum install socat -y
socat -V
## rabbitmq-server
https://www.rabbitmq.com/install-rpm.html#package-cloud
https://packagecloud.io/rabbitmq/erlang/install#bash-rpm
# import the new PackageCloud key that will be used starting December 1st, 2018 (GMT)
rpm --import https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
# import the old PackageCloud key that will be discontinued on December 1st, 2018 (GMT)
rpm --import https://packagecloud.io/gpg.key
# After importing both keys please follow the Package Cloud repository setup instructions.
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
yum install rabbitmq-server -y
检查
rabbitmqctl version
done(两种安装都介绍完了)
RabbitMQ 启动
rabbitmq常用命令 - https://blog.csdn.net/shaoyunzhe/article/details/96461703
# rabbitmq启动和停止命令
# 设置服务,开机自动启动
chkconfig rabbitmq-server on
# 建议使用 systemctl enable rabbitmq-server
# 启动服务
service rabbitmq-server start
# 建议使用 systemctl start rabbitmq-server
# 停止服务
service rabbitmq-server stop
# 建议使用 systemctl stop rabbitmq-server
查看状态:rabbitmqctl status
# 启动 rabbitmq 管理界面
# 开启管理界面插件
rabbitmq-plugins enable rabbitmq_management
查看插件打开情况:
rabbitmq-plugins list
关闭监控管理器:rabbitmq-plugins disable rabbitmq_management
# 防火墙打开 15672 管理端口
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --reload
查看开启的端口
firewall-cmd --zone=public --list-ports
## 访问
访问服务器的15672端口,例如:
# 添加用户
# 添加用户
rabbitmqctl add_user admin admin
查看用户:rabbitmqctl list_users
# 新用户设置用户为超级管理员
rabbitmqctl set_user_tags admin administrator
# 如果失败了
# 开放 4369 端口– erlang发现端口
# firewall-cmd --zone=public --add-port=4369/tcp --permanent
# firewall-cmd --reload
查看用户:
rabbitmqctl list_users
## 设置访问权限
登录:admin
密码:admin
# 开放客户端连接端口
# 打开客户端连接端口
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --reload
主要端口介绍
- 4369 – erlang发现口
- 5672 – client端通信口
- 15672 – 管理界面ui端口
- 25672 – server间内部通信口
查看开放的端口
firewall-cmd --zone=public --list-ports
rabbitmq六种工作模式
# 简单模式
RabbitMQ是一个消息中间件,你可以想象它是一个邮局。当你把信件放到邮箱里时,能够确信邮递员会正确地递送你的信件。RabbitMq就是一个邮箱、一个邮局和一个邮递员。
- 发送消息的程序是生产者
- 队列就代表一个邮箱。虽然消息会流经RbbitMQ和你的应用程序,但消息只能被存储在队列里。队列存储空间只受服务器内存和磁盘限制,它本质上是一个大的消息缓冲区。多个生产者可以向同一个队列发送消息,多个消费者也可以从同一个队列接收消息.
- 消费者等待从队列接收消息
## pom.xml
添加 slf4j 依赖, 和 rabbitmq amqp 依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tedu</groupId>
<artifactId>rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.8.0-alpha2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.8.0-alpha2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
## 生产者发送信息
package rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Test1 {
public static void main(String[] args) throws Exception {
//创建连接工厂,并设置连接信息
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);//可选,5672是默认端口
f.setUsername("admin");
f.setPassword("admin");
/* * 与rabbitmq服务器建立连接, * rabbitmq服务器端使用的是nio,会复用tcp连接, * 并开辟多个信道与客户端通信 * 以减轻服务器端建立连接的开销 */
Connection c = f.newConnection();
//建立信道
Channel ch = c.createChannel();
/* * 声明队列,会在rabbitmq中创建一个队列 * 如果已经创建过该队列,就不能再使用其他参数来创建 * * 参数含义: * -queue: 队列名称 * -durable: 队列持久化,true表示RabbitMQ重启后队列仍存在 * -exclusive: 排他,true表示限制仅当前连接可用 * -autoDelete: 当最后一个消费者断开后,是否删除队列 * -arguments: 其他参数 */
ch.queueDeclare("helloworld", false,false,false,null);
/* * 发布消息 * 这里把消息向默认交换机发送. * 默认交换机隐含与所有队列绑定,routing key即为队列名称 * * 参数含义: * -exchange: 交换机名称,空串表示默认交换机"(AMQP default)",不能用 null * -routingKey: 对于默认交换机,路由键就是目标队列名称 * -props: 其他参数,例如头信息 * -body: 消息内容byte[]数组 */
ch.basicPublish("", "helloworld", null, "Hello world!".getBytes());
System.out.println("消息已发送");
c.close();
}
}