R A B B I T M Q

RabbitMQ 使用场景

# 服务解耦

假设有这样一个场景, 服务A 产生数据, 而 服务B,C,D 需要这些数据, 那么我们可以在 A服务 中直接调用 B,C,D服务 ,把数据传递到下游服务即可

但是,随着我们的应用规模不断扩大,会有更多的服务需要A的数据,如果有几十甚至几百个下游服务,而且会不断变更,再加上还要考虑下游服务出错的情况,那么A服务中调用代码的维护会极为困难

这是由于服务之间耦合度过于紧密

再来考虑用 RabbitMQ 解耦的情况

A服务 只<mark>需要向消息服务器发送消息,而不用考虑谁需要这些数据;下游服务如果需要数据,自行从消息服务器订阅消息</mark> ,不再需要数据时则取消订阅即可

# 流量削峰

假设我们有一个应用,平时访问量是每秒300请求,我们用一台服务器即可轻松应对

而在高峰期,访问量瞬间翻了十倍,达到每秒3000次请求,那么单台服务器肯定无法应对,这时我们可以考虑增加到10台服务器,来分散访问压力

但如果这种瞬时高峰的情况每天只出现一次,每次只有半小时,那么<mark>我们10台服务器在多数时间都只分担每秒几十次请求,这样就有点浪费资源了</mark>


这种情况,我们就可以使用 RabbitMQ 来进行 <mark>流量削峰</mark>

<mark>高峰情况下,瞬间出现的大量请求数据,先发送到消息队列服务器,排队等待被处理,而我们的应用,可以慢慢的从消息队列接收请求数据进行处理,这样把数据处理时间拉长,以减轻瞬时压力</mark>

这是消息队列服务器非常典型的应用场景

这种情况,我们就可以使用RabbitMQ来进行流量削峰,高峰情况下,瞬间出现的大量请求数据,先发送到消息队列服务器,排队等待被处理,而我们的应用,可以慢慢的从消息队列接收请求数据进行处理,这样把数据处理时间拉长,以减轻瞬时压力

这是消息队列服务器非常典型的应用场景

# 异步调用

考虑定外卖支付成功的情况

  • 支付后要发送支付成功的通知,
  • 再寻找外卖小哥来进行配送,

而寻找外卖小哥的过程非常耗时,尤其是高峰期,可能要等待几十秒甚至更长

这样就造成整条调用链路响应非常缓慢

<mark>而如果我们引入 RabbitMQ 消息队列,订单数据可以发送到消息队列服务器,</mark>

<mark>那么调用链路也就可以到此结束,订单系统则可以立即得到响应,整条链路的响应时间只有200毫秒左右</mark>

<mark>寻找外卖小哥的应用可以以异步的方式从消息队列接收订单消息,再执行耗时的寻找操作</mark>


RabbitMQ 基本概念 (术语)

RabbitMQ 是一种消息中间件,<mark>用于处理来自客户端的异步消息</mark>。

  • 服务端将要发送的消息放入到队列池中。
  • 接收端可以根据 RabbitMQ 配置的转发机制接收服务端发来的消息。
  • RabbitMQ 依据指定的转发规则进行消息的转发、缓冲和持久化操作,

主要用在 多服务器间单服务器的子系统间 进行通信,是分布式系统标准的配置。

RabbitMQ 流程图(下面一一介绍每一步)


# Exchange (交换)

接受生产者发送的消息,并根据 Binding 规则 <mark>将消息路由给服务器中的队列</mark>。

ExchangeType 决定了 Exchange 路由消息的行为。

在RabbitMQ中,ExchangeType常用的有三种。

  • direct (直接塞进去)
  • Fanout (数目划分)
  • Topic (类型划分)
    .

# Message Queue (消息队列)

我们发送给 RabbitMQ 的消息最后都会到达各种 queue ,并且存储在其中(如果路由找不到相应的 queue 则数据会丢失),等待消费者来取。

# Binding Key

它表示的是 ExchangeMessage Queue 是通过 binding key 进行联系的,这个关系是固定。

# Routing Key

生产者在将消息发送给 Exchange 的时候,一般会指定一个 routing key

routing key 来 <mark>指定这个消息的路由规则</mark>。

这个 routing key 需要与 Exchange Typebinding key 联合使用才能生效,<mark>我们的生产者只需要通过指定routing key来决定消息流向哪里</mark>。

RabbitMQ 安装

在centos7上安装rabbitmq

<mark>下面介绍两种:</mark>

  • <mark>wget 一个一个依赖包下载</mark>(<mark>体验过程很重要</mark>)
  • <mark>yum 用 rabbitmq 官网的脚本</mark>,一键安装

https://www.rabbitmq.com/install-rpm.html

先介绍前者

# 安装erlang语言库

erlang 官网 : https://www.erlang.org/

安装过程参考:RabbitMQ安装,安装erlang依赖,安装RabbitMQ
erlang教程 :W3C school Erlang 教程

RabbitMQ 使用了 Erlang 开发语言,<mark>Erlang是为电话交换机开发的语言,天生自带高并发光环,和高可用特性</mark>

## 找语言包

我们可以冲 rabbitmq 官方 找到 rabbitmq官方精简的Erlang语言包

## 安装(rpm方式)

这个链接 https://www.cnblogs.com/swyy/p/11582309.html
介绍了 <mark>yum方式安装</mark> 和 <mark>源码安装</mark>

下载 rlang 的 rpm 包

# 下载Erlang语言包
wget https://github.com/rabbitmq/erlang-rpm/releases/download/v21.2.6/erlang-21.2.6-1.el7.x86_64.rpm

这里 github 提供了三个下载的地方(下面提供最新地址 2020年3月10日)

  • github-release : https://github.com/rabbitmq/erlang-rpm/releases/download/v21.3.8.14/erlang-21.3.8.14-1.el7.x86_64.rpm
  • packagecloud:
wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-21.3.8.14-1.el7.x86_64.rpm/download.rpm
  • bintray https://bintray.com/rabbitmq-erlang/rpm/download_file?file_path=erlang%2F21%2Fel%2F7%2Fx86_64%2Ferlang-21.3.8.14-1.el7.x86_64.rpm

下载好 .rpm 软件包。
接下来是安装 .rpm 软件包

# 安装Erlang
rpm -ivh erlang-21.2.6-1.el7.x86_64.rpm --force --nodeps

博客园 Linux rpm命令详解
菜鸟教程 Linux rpm命令
-i:显示套件的相关信息;
-v:显示指令执行过程;
-h或–hash:套件安装时列出标记;
–force 忽略报错,强行安***r> –nodeps  不验证套件档的相互关联性。

查看是否安装成功

erl -version

出现“Erlang (SMP,ASYNC_THREADS,HIPE) (BEAM) emulator version 10.5”证明安装成功

# 安装 socat 依赖

rabbitmq 依赖 socat

socat 的官方网站: http://www.dest-unreach.org/socat/

官网极度简洁。。。
刚进入都怀疑我关了 css

## socat简介

socat 是一个 <mark>多功能的网络工具</mark>,名字来由是” Socket CAT ”,可以看作是 netcat 的N
倍加强版

socat 是一个<mark>两个独立数据通道之间的双向数据传输的继电器</mark>。这些数据通道包含文件、管道、设备(终端或调制解调器等)、插座(Unix,IP4,IP6 - raw,UDP,TCP)、SSL、SOCKS4客户端或***CONNECT。

Socat <mark>支持广播和多播、抽象Unix sockets、Linux tun/tap、GNU readline 和 PTY</mark>。它提供了分叉、记录和进程间通信的不同模式。多个选项可用于调整socat和其渠道,Socat可以作为TCP中继(一次性或守护进程),作为一个守护进程基于socksifier,作为一个shell Unix套接字接口,作为IP6的继电器,或面向TCP的程序重定向到一个串行线。

socat 的主要特点就是在两个数据流之间建立通道;
<mark>且支持众多协议和链接方式:ip, tcp, udp, ipv6, pipe,exec,system,open,proxy,openssl,socket等</mark>。

## socat依赖包

# 下载 socat rpm 包
wget	http://mirror.centos.org/altarch/7/os/armhfp/Packages/socat-1.7.3.2-2.el7.armv7hl.rpm
# 安装 socat 依赖包
rpm  -ivh socat-1.7.3.2-2.el7.x86_64.rpm


检查是否安装成功

socat -V

# 安装 RabbitMQ-server

## 找 rpm 包

要么 在 pkgs.org 上继续找:https://pkgs.org/download/rabbitmq-server
https://centos.pkgs.org/7/epel-x86_64/rabbitmq-server-3.3.5-34.el7.noarch.rpm.html

wget	https://download-ib01.fedoraproject.org/pub/epel/7/x86_64/Packages/r/rabbitmq-server-3.3.5-34.el7.noarch.rpm

要么在 到 rabbitmq 官网下载
https://www.rabbitmq.com/install-rpm.html

https://packagecloud.io/rabbitmq/rabbitmq-server/

wget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.3-1.el7.noarch.rpm/download.rpm

这里官网的最新 用 官网的 (2020年3月10日)

然后依然是 安装

rpm -ivh rabbitmq-server-3.8.3-1.el7.noarch.rpm

最后检验是否安装成功

rabbitmqctl version

CTL是英文ConTroL(控制)的缩写

# Yum 安装

用 yum 安装,先配好 阿里云国内镜像啊 ,不然很慢 https://editor.csdn.net/md/?articleId=103931092

## 需要手动安装的依赖

https://www.rabbitmq.com/install-rpm.html#package-dependencies

  • logrotate 貌似不用

### erlang

https://www.rabbitmq.com/install-rpm.html#install-erlang-from-epel-repository

安装必要环境 erlang

 yum install erlang -y

检查是否安装

erl -version

### socat

yum install socat -y

socat -V

## rabbitmq-server

https://www.rabbitmq.com/install-rpm.html#package-cloud

https://packagecloud.io/rabbitmq/erlang/install#bash-rpm

# import the new PackageCloud key that will be used starting December 1st, 2018 (GMT)
rpm --import https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey

# import the old PackageCloud key that will be discontinued on December 1st, 2018 (GMT)
rpm --import https://packagecloud.io/gpg.key
# After importing both keys please follow the Package Cloud repository setup instructions.
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash

yum install rabbitmq-server -y


检查

rabbitmqctl version

done(两种安装都介绍完了)

RabbitMQ 启动

rabbitmq常用命令 - https://blog.csdn.net/shaoyunzhe/article/details/96461703

# rabbitmq启动和停止命令

# 设置服务,开机自动启动
chkconfig rabbitmq-server on
# 建议使用 systemctl enable rabbitmq-server

# 启动服务
service rabbitmq-server start
# 建议使用 systemctl start rabbitmq-server

# 停止服务
service rabbitmq-server stop
# 建议使用 systemctl stop rabbitmq-server

查看状态:rabbitmqctl status

# 启动 rabbitmq 管理界面

# 开启管理界面插件
rabbitmq-plugins enable rabbitmq_management

查看插件打开情况:rabbitmq-plugins list

关闭监控管理器:rabbitmq-plugins disable rabbitmq_management

# 防火墙打开 15672 管理端口
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --reload

查看开启的端口
firewall-cmd --zone=public --list-ports

## 访问

访问服务器的15672端口,例如:

http://192.168.64.140:15672

# 添加用户

# 添加用户
rabbitmqctl add_user admin admin

查看用户:rabbitmqctl list_users

# 新用户设置用户为超级管理员
rabbitmqctl set_user_tags admin administrator
# 如果失败了 
# 开放 4369 端口– erlang发现端口
# firewall-cmd --zone=public --add-port=4369/tcp --permanent
# firewall-cmd --reload

查看用户:rabbitmqctl list_users

## 设置访问权限

登录:admin
密码:admin


# 开放客户端连接端口

# 打开客户端连接端口
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --reload

主要端口介绍

  • 4369 – erlang发现口
  • 5672 – client端通信口
  • 15672 – 管理界面ui端口
  • 25672 – server间内部通信口

查看开放的端口
firewall-cmd --zone=public --list-ports

rabbitmq六种工作模式

# 简单模式

RabbitMQ是一个消息中间件,你可以想象它是一个邮局。当你把信件放到邮箱里时,能够确信邮递员会正确地递送你的信件。RabbitMq就是一个邮箱、一个邮局和一个邮递员。

  • 发送消息的程序是生产者
  • 队列就代表一个邮箱。虽然消息会流经RbbitMQ和你的应用程序,但消息只能被存储在队列里。队列存储空间只受服务器内存和磁盘限制,它本质上是一个大的消息缓冲区。多个生产者可以向同一个队列发送消息,多个消费者也可以从同一个队列接收消息.
  • 消费者等待从队列接收消息

## pom.xml

添加 slf4j 依赖, 和 rabbitmq amqp 依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.tedu</groupId>
	<artifactId>rabbitmq</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<dependencies>
		<dependency>
			<groupId>com.rabbitmq</groupId>
			<artifactId>amqp-client</artifactId>
			<version>5.4.3</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.8.0-alpha2</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>1.8.0-alpha2</version>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.8.0</version>
				<configuration>
					<source>1.8</source>
					<target>1.8</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

## 生产者发送信息

package rabbitmq.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Test1 {
	public static void main(String[] args) throws Exception {
		//创建连接工厂,并设置连接信息
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setPort(5672);//可选,5672是默认端口
		f.setUsername("admin");
		f.setPassword("admin");

		/* * 与rabbitmq服务器建立连接, * rabbitmq服务器端使用的是nio,会复用tcp连接, * 并开辟多个信道与客户端通信 * 以减轻服务器端建立连接的开销 */
		Connection c = f.newConnection();
		//建立信道
		Channel ch = c.createChannel();

		/* * 声明队列,会在rabbitmq中创建一个队列 * 如果已经创建过该队列,就不能再使用其他参数来创建 * * 参数含义: * -queue: 队列名称 * -durable: 队列持久化,true表示RabbitMQ重启后队列仍存在 * -exclusive: 排他,true表示限制仅当前连接可用 * -autoDelete: 当最后一个消费者断开后,是否删除队列 * -arguments: 其他参数 */
		ch.queueDeclare("helloworld", false,false,false,null);

		/* * 发布消息 * 这里把消息向默认交换机发送. * 默认交换机隐含与所有队列绑定,routing key即为队列名称 * * 参数含义: * -exchange: 交换机名称,空串表示默认交换机"(AMQP default)",不能用 null * -routingKey: 对于默认交换机,路由键就是目标队列名称 * -props: 其他参数,例如头信息 * -body: 消息内容byte[]数组 */
		ch.basicPublish("", "helloworld", null, "Hello world!".getBytes());

		System.out.println("消息已发送");
		c.close();
	}
}