场景及其优越性

RabbitMQ消息队列,用于两个进程间的通信,基于AMQP协议(底层是TCP),所以可以跨网络跨机器通信;

  • 解耦合:微服务大行其道,java应用可以通过rabiitmq调用python应用,实现应用解耦合,而非jpython直接调用;
  • 异步与广播:java应用调用其他服务(比如python应用),不必要阻塞等待调用结束,可以直接继续做自己的事情;并且可以将一个消息广播给多个应用,让多个应用分别处理各自业务。
  • 缓存和削峰:这里的缓存指的是将要处理的业务逻辑缓存在消息队列中,削峰填谷处理业务逻辑,在时间上平摊服务压力。

AMQP协议

AMQP全称:advanced message queue protocol
图片说明

  • Server:接收客户端的连接,实现AMQP实体服务。
  • Connection:连接,应用程序与Server的网络连接,TCP连接。
  • Channel:信道,消息读写等操作在信道中进行。客户端可以建立多个信道,每个信道代表一个会话任务。
  • Message:消息,应用程序和服务器之间传送的数据,消息可以非常简单,也可以很复杂。有Properties和Body组成。Properties为外包装,可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息体内容。
  • Virtual Host:虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名称的Exchange或Queue。
  • Exchange:交换器,接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ常用的交换器常用类型有direct、topic、fanout、headers四种,后面详细介绍。
  • Binding:绑定,交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个RoutingKey。
  • RoutingKey:路由键,生产者将消息发送给交换器的时候,会发送一个RoutingKey,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。路由键通常为一个“.”分割的字符串,例如“com.rabbitmq”。
  • Queue:消息队列,用来保存消息,供消费者消费。

AMQP协议基于TCP协议,基于NIO复用TCP连接。客户端通过channel连接Exchange,将消息广播到绑定的queue,当然也可以是点对点单播;消费者从对应的queue中获取消息;

基础理论

安装

推荐docker安装rabbitmq

docker run -d --name rabbitmq \
    -p 5671:5671 \
    -p 5672:5672 \
    -p 4369:4369 \
    -p 25672:25672 \
    -p 15671:15671 \
    -p 15672:15672 \
    rabbitmq:management

端口说明

  • 4369 (epmd), 25672 (Erlang distribution)
  • 5672, 5671 (AMQP 0-9-1 without and with TLS)
  • 15672 (if management plugin is enabled)
  • 61613, 61614 (if STOMP is enabled)
  • 1883, 8883 (if MQTT is enabled)

交换器Exchange

Name(交换机类型) Default pre-declared names(预声明的默认名称)
Direct exchange(直连交换机) (Empty string) and amq.direct
Fanout exchange(扇型交换机) amq.fanout
Topic exchange(主题交换机) amq.topic
Headers exchange(头交换机) amq.match (and amq.headers in RabbitMQ)

除交换机类型外,在声明交换机时还可以附带许多其他的属性,其中最重要的几个分别是:

  • Name
  • Durability (消息代理重启后,交换机是否还存在)
  • Auto-delete (当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它)
  • Arguments(依赖代理本身)

交换机可以有两个状态:持久(durable)、暂存(transient)。持久化的交换机会在消息代理(broker)重启后依旧存在,而暂存的交换机则不会(它们需要在代理再次上线后重新被声明)。然而并不是所有的应用场景都需要持久化的交换机。

Headers Exchange(头交换机):头交换机类似与主题交换机,但是却和主题交换机有着很大的不同。主题交换机使用路由键来进行消息的路由,而头交换机使用消息属性来进行消息的分发,通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。在头交换机里有一个特别的参数”x-match”,当"x-match"的值为"ant"时,只需要消息头的任意一个值匹配成功即可,当”x-match”值为“all”时,要求消息头的所有值都需相等才可匹配成功。

队列

AMQP中的队列(queue)跟其他消息队列或任务队列中的队列是很相似的:它们存储着即将被应用消费掉的消息。队列跟交换机共享某些属性,但是队列也有一些另外的属性。

  • Name
  • Durable(消息代理重启后,队列依旧存在)
  • Exclusive(只被一个连接(connection)使用,而且当连接关闭后队列即被删除)
  • Auto-delete(当最后一个消费者退订后即被删除)
  • Arguments(一些消息代理用他来完成类似与TTL的某些额外功能)

队列在声明(declare)后才能被使用。如果一个队列尚不存在,声明一个队列会创建它。如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响。如果声明中的属性与已存在队列的属性有差异,那么一个错误代码为406的通道级异常就会被抛出。

队列名称

队列的名字可以由应用(application)来取,也可以让消息代理(broker)直接生成一个。队列的名字可以是最多255字节的一个utf-8字符串。若希望AMQP消息代理生成队列名,需要给队列的name参数赋值一个空字符串:在同一个通道(channel)的后续的方法(method)中,我们可以使用空字符串来表示之前生成的队列名称。之所以之后的方法可以获取正确的队列名是因为通道可以默默地记住消息代理最后一次生成的队列名称。

以"amq."开始的队列名称被预留做消息代理内部使用。如果试图在队列声明时打破这一规则的话,一个通道级的403 (ACCESS_REFUSED)错误会被抛出。

队列持久化

持久化队列(Durable queues)会被存储在磁盘上,当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。并不是所有的场景和案例都需要将队列持久化。

持久化的队列并不会使得路由到它的消息也具有持久性。倘若消息代理挂掉了,重新启动,那么在重启的过程中持久化队列会被重新声明,无论怎样,只有经过持久化的消息才能被重新恢复。

消息属性和有效载荷(消息主体)

AMQP模型中的消息(Message)对象是带有属性(Attributes)的.

  • Content type(内容类型)
  • Content encoding(内容编码)
  • Routing key(路由键)
  • Delivery mode (persistent or not)
    投递模式(持久化 或 非持久化)
  • Message priority(消息优先权)
  • Message publishing timestamp(消息发布的时间戳)
  • Expiration period(消息有效期)
  • Publisher application id(发布应用的ID)

消息确认

消费者应用(Consumer applications) - 用来接受和处理消息的应用 - 在处理消息的时候偶尔会失败或者有时会直接崩溃掉。而且网络原因也有可能引起各种问题。这就给我们出了个难题,AMQP代理在什么时候删除消息才是正确的?AMQP 0-9-1 规范给我们两种建议:

  • 当消息代理(broker)将消息发送给应用后立即删除。(使用AMQP方法:basic.deliver或basic.get-ok)
  • 待应用(application)发送一个确认回执(acknowledgement)后再删除消息。(使用AMQP方法:basic.ack)

前者被称作自动确认模式(automatic acknowledgement model),后者被称作显式确认模式(explicit acknowledgement model)。在显式模式下,由消费者应用来选择什么时候发送确认回执(acknowledgement)。应用可以在收到消息后立即发送,或将未处理的消息存储后发送,或等到消息被处理完毕后再发送确认回执(例如,成功获取一个网页内容并将其存储之后)。

如果一个消费者在尚未发送确认回执的情况下挂掉了,那AMQP代理会将消息重新投递给另一个消费者。如果当时没有可用的消费者了,消息代理会死等下一个注册到此队列的消费者,然后再次尝试投递。

拒绝消息

当一个消费者接收到某条消息后,处理过程有可能成功,有可能失败。应用可以向消息代理表明,本条消息由于“拒绝消息(Rejecting Messages)”的原因处理失败了(或者未能在此时完成)。当拒绝某条消息时,应用可以告诉消息代理如何处理这条消息——销毁它或者重新放入队列。当此队列只有一个消费者时,请确认不要由于拒绝消息并且选择了重新放入队列的行为而引起消息在同一个消费者身上无限循环的情况发生。

虚拟主机

为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等),AMQP提供了一个虚拟主机(virtual hosts - vhosts)的概念。这跟Web servers虚拟主机概念非常相似,这为AMQP实体提供了完全隔离的环境。当连接被建立的时候,AMQP客户端来指定使用哪个虚拟主机。

SpringBoot应用RabbitMQ

pom:spring-boot-starter-amqp

yml:

spring:
  #给项目来个名字
  application:
    name: rabbitmq-provider
  #配置rabbitMq 服务器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: root
#    #虚拟host 可以不设置,使用server默认host
#    virtual-host: JCcccHost

exchange和queue绑定关系配置

springboot通过@Configuration配置消息队列,交换机,路由键绑定关系

点对点的直连交换机

@Configuration
public class DirectRabbitConfig {

    //队列 起名:TestDirectQueue
    @Bean
    public Queue TestDirectQueue(){
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //   return new Queue("TestDirectQueue",true,true,false);
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("TestDirectQueue", true);
    }

    //Direct交换机 起名:TestDirectExchange
    @Bean
    DirectExchange TestDirectExchange(){
        //  return new DirectExchange("TestDirectExchange",true,true);
        return new DirectExchange("TestDirectExchange", true, false);
    }
    //绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
    @Bean
    Binding bindingDirect(){
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
    }
}

主题交换机

@Configuration
public class TopicRabbitConfig {
   //绑定键
   public final static String man = "topic.man";
   public final static String woman = "topic.woman";
   @Bean
   public Queue firstQueue(){
      return new Queue(TopicRabbitConfig.man);
   }
   @Bean
   public Queue secondQueue(){
      return new Queue(TopicRabbitConfig.woman);
   }
   @Bean
   TopicExchange exchange(){
      return new TopicExchange("topicExchange");
   }
   //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
   //这样只要是消息携带的路由键是topic.man,才会分发到该队列
   @Bean
   Binding bindingExchangeMessage(){
      return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
   }
   //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
   // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
   //*:表示一个词,#:表示零个或多个词
   @Bean
   Binding bindingExchangeMessage2(){
      return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
   }
}

消息生产者Controller

rabbitmq对于消息的处理基于RabbitMQTemplate,消息都会被保存到map中,RabbitMQTemplate使用threadLocal保证并发安全;

{
@Autowired
RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法

@GetMapping("/sendDirectMessage")
public String sendDirectMessage(){
   String messageId = String.valueOf(UUID.randomUUID());
   String messageData = "test message, hello!";
   String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
   Map<String, Object> map = new HashMap<>();
   map.put("messageId", messageId);
   map.put("messageData", messageData);
   map.put("createTime", createTime);
   //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
   rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
   return "ok";
}
}

消息消费者***

消息消费者基于***实现消息处理

@Component
@RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
public class DirectReceiver {
    @RabbitHandler
    public void process(Map testMessage){
        System.out.println("DirectReceiver消费者收到消息  : " + testMessage.toString());
    }
}
运行结果:
DirectReceiver消费者收到消息  : {createTime=2020-08-04 14:46:30, messageId=52e42a12-0cfd-4117-a3ad-10ed42569cc0, messageData=test message, hello!}

消息确认

RabbitMQ的消息确认有两种。
一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。
第二种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。

  • 实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调。
  • 实现returnedMessage接口,消息从交换器发送到对应队列失败时触发
    • 如果消息没有到exchange,则confirm回调,ack=false
    • 如果消息到达exchange,则confirm回调,ack=true
    • exchange到queue成功,则不回调return
    • exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)

消息确认模式
AcknowledgeMode.NONE:不确认
AcknowledgeMode.AUTO:自动确认
AcknowledgeMode.MANUAL:手动确认

package com.tang.rabbitmq.Config;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.beans.factory.annotation.Autowired;

import javax.annotation.PostConstruct;

import static org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;

public class RabbitTemplateConfig implements ConfirmCallback, ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
    }

    //消息发送到交换器Exchange后触发回调
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause){
        System.out.println("消息唯一标识:" + correlationData);
        System.out.println("确认结果:" + ack);
        System.out.println("失败原因:" + cause);
    }

    // 消息从交换器发送到对应队列失败时触发
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey){
        System.out.println("消息主体message:" + message);
        System.out.println("消息主体message:" + replyCode);
        System.out.println("描述:" + replyText);
        System.out.println("消息交换器exchange:" + exchange);
        System.out.println("消息路由键routing: " + routingKey);
    }
}

集群

主备模式:不能保存消息队列,消息可能会丢失;主节点提供读写,备用节点不提供读写。如果主节点挂了,就切换到备用节点,原来的备用节点升级为主节点提供读写服务,当原来的主节点恢复运行后,原来的主节点就变成备用节点
镜像模式:一个结点有多个完全的镜像复制,
多活模式:这种模式是实现异地数据复制的主流模式,这种模型以来RabbitMQ的federation插件,可以实现持续的可靠的AMQP数据通信,多活模式在实际配置与应用非常简单;RabbitMQ部署架构采用双中心模式(多中心),那么在两套(多套)数据中心各部署一套RabbitMQ集群,各中心的RabbitMQ服务除了需要为业务提供正常的消息服务外,中心之间还需要实现部分队列消息共享