Stream

  • 官网
  • SpringCloud Stream 是用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务框架。该框架提供了一个灵活的编程模型,建立在已经建立和熟悉的 Spring 熟语和最佳实践上,包括支持持久化的发布-订阅、消费组以及消息分区这三个核心概念
  • 应用程序通过 inputs 或者 outputs 来与 SpringCloud Stream 中的 Binder 对象交互。通过我们配置来 binding(绑定),而 SpringCloud Stream 的 Binder 对象负责与消息中间件交互。所以只需要搞清除如何与 SpringCloud Stream 交互就可以方便使用消息驱动的方式,屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型
  • 通过使用 Spring Integration 来连接消息代理中间件以实现消息事件驱动。SpringCloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。目前支持 RabbitMQ、Kafka

MQ 模型

  • Message:生产者和消费者之间靠消息媒介传递消息内容
  • MessageChannel消息通道:消息必须走特定的通道
  • MessageHandler消息处理器:消息通道的消息消费、负责收发处理。通过订阅消息通道 MessageChannel 的子接口 SubscribeChannel

Stream 的作用

  • 统一底层差异:在没有绑定器这个概念的情况下,我们的 SpringBoot 应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们实现细节上会有较大的差异性,通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的 Channel 通道,使得应用程序不需要再考虑各种不同的消息中间件实现。

SpringCloud Stream 的绑定器 Binder

  • 在没有绑定器这个概念的情况下,我们的 SpringBoot 应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们实现细节上会有较大的差异性,通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。Stream 对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(RabbitMQ 切换为 Kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程
  • 通过定义绑定器 Binder 作为中间层,实现了应用程序与消息中间件细节之间的隔离
  • Binder 可以生成 Binding,Binding 用来绑定消息容器的生产者和消费者,有两种类型,INPUT(对应于消费者) 和 OUTPUT(对应于生产者)

alt


Stream 的标准流程

  • Binder:连接中间件,屏蔽底层差异
  • Channel:通道,是队列 Queue 的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过 Channel 对队列进行配置
  • Source 和 Sink:参照物为 Stream 自身,从 Stream 发布消息就是输出,接受消息就是输入 alt

Stream 的使用

  • 引入依赖 spring-cloud-starter-stream-rabbit,在 yml 上添加相关配置

  • 生产者使用 @EnableBinding(Source.class) 绑定 channel 与 exchange;通过 MessageChannel 对象将消息发送过去 alt

  • 消费者使用也使用 @EnableBinding(Sink.class),通过 message.getPayload() 获取消费内容 alt

  • 总的来说: Source就像一个连接池 通过 output 发送到 source 池里面;然后消费者利用 sink 池进行接收 input 进行取出


Stream 避免消息重复消费

  • 如果一个订单同时被两个服务获取到,那么就会造成重复消费,可以使用 Stream 中的消息分组来解决;还可以使用 rabbitmq 里面的 Routing key 来分配给不同的队列解决
  • 微服务应用放置于同一个 Group 中,就能够保证消息只有被其中一个应用消费一次。不同的组是可以同时消费的,同一个组内会发生竞争关系,只有其中一个可以消费

alt


Stream 持久化

  • 当某些消费微服务宕机时,生产者继续生产的消息会被保留在之前的分组中,当原有分组的消费者启动时就会被消费,而新分组的消费者启动时会重新创建队列并监听,所以也就不会把这些未被消费的信息去消费了
  • 所以需要通过配置 group 来实现消息的持久化,避免消息丢失