• Namesrv: 存储当前集群所有Brokers信息、Topic跟Broker的对应关系。
  • Broker: 集群最核心模块,主要负责Topic消息存储、消费者的消费位点管理(消费进度)。
  • Producer: 消息生产者,每个生产者都有一个ID(编号),多个生产者实例可以共用同一个ID。同一个ID下所有实例组成一个生产者集群。
  • Consumer: 消息消费者,每个订阅者也有一个ID(编号),多个消费者实例可以共用同一个ID。同一个ID下所有实例组成一个消费者集群。
    1,启动Namesrv,Namesrv起来后监听端口,等待Broker、Produer、Consumer连上来,相当于一个路由控制中心。
    2,Broker启动,跟所有的Namesrv保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有topic信息。注册成功后,namesrv集群中就有Topic跟Broker的映射关系。
    3,收发消息前,先创建topic,创建topic时需要指定该topic要存储在哪些Broker上。也可以在发送消息时自动创建Topic。
    4,Producer发送消息,启动时先跟Namesrv集群中的其中一台建立长连接,并从Namesrv中获取当前发送的Topic存在哪些Broker上,然后跟对应的Broker建长连接,直接向Broker发消息。
    5,Consumer跟Producer类似。跟其中一台Namesrv建立长连接,获取当前订阅Topic存在哪些Broker,然后直接跟Broker建立连接通道,开始消费消息。

Topic

topic是消息的一种分类,消息发送/接收都要指定topic。topic和生产组/消费组之间是多对多的关系。

Queue

topic的细分。

Tag(EventCode)

每条发送的message都可以有一个tag;这样同一个topic可以按tag区分不同的业务场景。

在实践上,一个业务系统使用一个topic,用不同的tag区分不同的消息。

ConsumerGroup

在集群模式下(默认),一个消息只会被同一消费组中的一个节点消费;同一消费组的多个节点均衡的消费topic。在实践上,一个应用/微服务一个消费组。


Message是RocketMQ对消息的封装,我们也只能将消息封装为Message实例,才能通过RocketMQ发送出去。

集群模式(默认):

  • Consumer 实例平均分摊消费生产者发送的消息
  • 例子:订单消息,一般是只被消费一次(被标记为同一个 ConsumerGroup 组的消费者不会对消息重复消费)

广播模式:

  • 广播模式下消费消息:投递到 Broker 的消息会被每个 Consumer 进行消费,一条消息被多个 Consumer 消费,广播消费中 ConsumerGroup 暂时无用。
  • 例子:群公告,每个人都需要消费这个消息

怎么切换模式:通过 setMessageModel()

一个 Message 只有一个 Tag,Tag 是二级分类。过滤分为 Broker 端和 Consumer 端过滤。

  • Broker 端过滤,减少了无用的消息的进行网络传输,增加了 broker 的负担
  • Consumer 端过滤,完全可以根据业务需求进行过滤,但是增加了很多无用的消息传输

一般是监听 * ,或者指定 tag,|| 运算,SLQ92,FilterServer 等;

  • Tag 性能高,逻辑简单

  1. //使用生产者组名实例化一个生产者
  2. DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");
  3. // 指定RocketMQ nameServer地址
  4. producer.setNamesrvAddr("10.0.10.63:9876");
  5. // 启动生产者
  6. producer.start();
  7. //创建Message实例
  8. Message msg = new Message("BenchmarkTest" , "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)
  9. //调用sendOneway()发送消息,该方法不会管消息是否发送成功
  10. producer.sendOneway(msg);
  11. //同步发送消息,根据sendResult结果处理
  12. SendResult sendResult = producer.send(msg);
  13. //异步发送消息
  14. producer.send(msg, new SendCallback() {
  15. public void onSuccess(SendResult sendResult) {
  16. //发送成功,业务处理
  17. }
  18. public void onException(Throwable e) {
  19. //发送异常,业务处理
  20. }
  21. });