文章目录
一、消息存储
分布式队列因为有高可靠性,保证消息不会丢失的要求,所以数据要进行持久化存储。.
1. 为什么要存储到文件系统?如何保证性能?
持久化方式可以分成两大类
- 关系型数据库:ActiveMQ默认采用的KahaDB做消息存储,由于,普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。
- 文件系统:RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化,刷盘一般可以分为异步刷盘和同步刷盘两种模式
一般来讲性能对比上:文件系统>关系型数据库DB
RocketMq的文件存储系统有两点优化以保证性能:
- 消息存储(顺序写):RocketMQ的消息用
顺序写
,保证了消息存储的速度。目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度,但是磁盘随机写的速度只有大概100KB/s - 消息发送(零拷贝):将本机磁盘文件的内容发送到客户端需要进行多次复制,比如从磁盘复制数据到内核态内存;从内核态内存复制到用户态内存;从用户态内存复制到网络驱动,最后从网络驱动复制到网卡中。RocketMq采用Java中零拷贝的技术,让从内核态内存复制到用户态内存这一步省略,直接赋值到网络驱动中
零拷贝技术有个限制是不能超过2G,所以RocketMQ默认设置单个CommitLog日志数据文件为1G
2. 加入持久化后RocketMq的架构是什么样的?
- 消息生成者发送消息
- MQ收到消息,将消息进行持久化,在存储中新增一条记录
- 返回ACK给生产者
- MQ push 消息给对应的消费者,然后等待消费者返回ACK
- 如果消息消费者在指定时间内成功返回ack,那么MQ认为消息消费成功,在存储中删除消息,即执行第6步;如果MQ在指定时间内没有收到ACK,则认为消息消费失败,会尝试重新push消息,重复执行4、5、6步骤
- MQ删除消息
3. 存储结构是什么样的?
RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成的
CommitLog
:消息真正的物理存储文件是CommitLog,默认一个文件一个G,存储的是Topic,QueueId和Message,一个存储满了会自动创建一个新的。ConsumeQueue
:是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址,为了加快消息的读取速度。消费者消费某条消息时,先查询索引获取CommitLog的对应的物理地址。每个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件,文件很小,通常会加载到内存中。如果该文件丢失或者损坏,可以通过CommitLog恢复
IndexFile
:也是个索引文件,为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程
4. 刷盘机制有哪些?
- 同步刷盘(数据一定保存成功,但是速度慢):在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态。
- 异步刷盘(速度快,数据不一定保存成功):在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
5. 如何保证消息不丢失?
- RocketMq提供消息持久化机制,消息的刷盘策略分为同步刷盘和异步刷盘。同步刷盘即刷盘成功后再返回一个成功信息,能够保证数据一定保存成功,但是会降低系统吞吐量,异步刷盘与同步刷盘相反,我一般会采用同步刷盘的策略来保证消息不会丢失。
- RocketMq采用的文件系统存储而不是关系型数据库存储,因为在一般情况下文件系统的性能是比数据库性能高的
- 而RocketMq为了提高文件系统的读写的高性能,做了两点优化。第一点是采用顺序写的方式,这样可以大大提高磁盘写的性能。第二点采用了零拷贝,原来的文件读取流程是:从磁盘复制数据到内核态内存;从内核态内存复制到用户态内存;从用户态内存复制到网络驱动,最后从网络驱动复制到网卡中发送,零拷贝则省去了从内核态内存复制到用户态内存的这一过程,提高了读取的性能,但是零拷贝对文件大小有要求,所以RocketMq的持久化文件commitlog默认为1G。
- commitlog是存储了RocketMq的消息等核心信息,除此之外,还提供可一个ConsumeQueue作为持久化文件的索引,提高查询的效率,一般文件比较小,都是加载在内存中。除了ConsumeQueue之外,还会存储一个IndexFile文件,用来提供针对某一个key或者时间区间的查询。
二、高可用机制
RocketMq是天生支持分布式的,可以配置主从以及水平扩展
Master角色的Broker支持读和写,Slave角色的Broker仅支持读,也就是 Producer只能和Master角色的Broker连接写入消息;Consumer可以连接 Master角色的Broker,也可以连接Slave角色的Broker来读取消息。
1. 消息消费的高可用(主从)
- 在Consumer的配置文件中,并不需要设置是从Master读还是从Slave 读,当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave 读。有了自动切换Consumer这种机制,当一个Master角色的机器出现故障后,Consumer仍然可以从Slave读取消息,不影响Consumer程序。这就达到了消费端的高可用性。
- RocketMQ目前还不支持把Slave自动转成Master,如果机器资源不足,需要把Slave转成Master,则要手动停止Slave角色的Broker,更改配置文件,用新的配置文件启动Broker。
2. 消息发送高可用(配置多个主节点)
- 在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同 brokerId的机器组成一个Broker组),这样当一个Broker组的Master不可用后,其他组的Master仍然可用,Producer仍然可以发送消息。
3. 主从复制
如果一个Broker组有Master和Slave,消息需要从Master复制到Slave 上,有同步和异步两种复制方式。
- 同步复制:同步复制方式是等Master和Slave均写成功后才反馈给客户端写成功状态。如果Master出故障, Slave上有全部的备份数据,容易恢复同步复制会增大数据写入延迟,降低系统吞吐量。
- 异步复制:异步复制方式是只要Master写成功 即可反馈给客户端写成功状态。在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写 入Slave,有可能会丢失
- 通常情况下,应该把Master和Save配置成同步刷盘方式,主从之间配置成异步的复制方式,这样即使有一台机器出故障,仍然能保证数据不丢,是个不错的选择。
三、负载均衡
1. Producer负载均衡
Producer端,每个实例在发消息的时候,默认会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下,如下图:
2. Consumer负载均衡
如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让queue的总数量大于等于consumer的数量。
-
消费者的集群模式–启动多个消费者就可以保证消费者的负载均衡(均摊队列)
-
默认使用的是均摊队列:会按照queue的数量和实例的数量平均分配queue给每个实例,这样每个消费者可以均摊消费的队列,如下图所示6个队列和三个生产者。
- 另外一种平均的算法环状轮流分queue的形式,每个消费者,均摊不同主节点的一个消息队列,如下图所示:
对于广播模式并不是负载均衡的,要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。
四、消息重试机制
1. 顺序消息的重试
-
对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。
-
因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
2. 无序消息的重试
-
对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。
-
无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
-
消息队列 RocketMQ 默认允许每条消息
最多重试 16 次
,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,如果依然失败就会进入死信队列。 -
一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。
-
也可以通过配置,让其不再重试,但是不建议这样
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
try {
doConsumeMessage(message);
} catch (Throwable e) {
//捕获消费逻辑中的所有异常,并返回 Action.CommitMessage;
return Action.CommitMessage;
}
//消息处理正常,直接返回 Action.CommitMessage;
return Action.CommitMessage;
}
}
五、死信队列
死信消息具有以下特性:
- 不会再被消费者正常消费。
- 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。
死信队列具有以下特性:
- 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
- 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
- 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。
查看死信队列
- 在控制台查询出现死信队列的主题信息
- 在消息界面根据主题查询死信消息
- 选择重新发送消息
一条消息进入死信队列,意味着某些因素导致消费者无法正常消费该消息,因此,通常需要您对其进行特殊处理。排查可疑因素并解决问题后,可以在消息队列 RocketMQ 控制台重新发送该消息,让消费者重新消费一次。
六、消费幂等
消息队列 RocketMQ 消费者在接收到消息以后,有必要根据业务上的唯一 Key 对消息做幂等处理的必要性。
1. 什么时候产生重复消息?
在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:
-
发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
-
消费时消息重复
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
-
负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)
当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
2. 处理方式
因为 Message ID 有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。 最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 进行设置:
Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);
订阅方收到消息时可以根据消息的 Key 进行幂等处理:
consumer.subscribe("ons_test", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
String key = message.getKey()
// 根据业务唯一标识的 key 做幂等处理
}
});