Leader选举
利用zk选举
zk的三个特性:
- watch机制
- 节点不允许重复写入
- 使用临时节点
在开始时所有的副本都去zk创建一个临时节点,先创建成功的副本就成为leader,后面的副本都watch这个临时节点,leader挂掉后就会触发watch事件,所有的副本开始重新选举一个leader。这样的实现比较简单,但是会存在一定的弊端。如果分区和副本数量过多,所有的副本都直接进行选举的话,一旦出现节点的增减,就会触发大量的watch事件,zk的负载就会过重。
kafka早期的版本就是使用zk选举的。
谁来主持选举?
- Controller选举
kafka先在brokers里面选一个broker作为Controller主持选举。 - Controller是怎么被选举出来的?
Controller是使用zookeeper选举出来的,每个broker都往zk里面写一个/controller节点,谁先写成功,谁就成为Controller。如果controller失去连接,zk上的临时节点就会消失。其它的broker通过watcher监听到Controller下线的消息后,开始选举新的Controller。 - Controller的责任
- 监听Broker的变化。
- 监听Topic变化
- 监听Partition变化
- 获取和管理Broker、Topic、Partition的信息
- 管理Partition的主从信息
选举
- 谁可以参加选举?
AR:全部副本
ISR:保持同步的副本
OSR:未同步的副本
AR=ISR+OSR
所以选举从ISR中进行选举。如果ISR中没有副本,只能从OSR中选举一个作为Leader,但是OSR中副本的数据可能会存在数据丢失,所以这个功能是可以配置的,默认是打开的。配置项:unclean.leader.election.enable = true/false - Controller是怎么进行Leader选举的?
kafka没有使用Paxos衍生算法,它们的思想归纳起来就是:先到先得、少数服从多数。这类算法可能会出现脑裂(节点不能互通的时候,出现多个leader)、惊群效应(大量的watch事件被触发)。
kafka使用算法最贴近微软的PacificA算法。是一种优先算法。在这种算法重,默认是让ISR中第一个replica成为leader。
主从同步
leader确定之后,客户端的读写只能操作leader节点。follower需要向leader同步数据。
LEO和HW
- LEO(Log End Offset):下一条等待写入消息的offset(最新的offset+1)。
- HW(High Watermark):ISR中最小的LEO。leader会将ISR中最小的LEO作为HW,图中是HW=6。
consumer最多只能消费到HW之前的位置(消费到offset 5的消息)。也就是说其它的副本没有同步的消息是不能被消费的。
从节点怎么跟主节点保持同步?
- follower节点会向leader发送一个fetch请求,leader向follower发送数据后,更新follower的LEO。
- 当follower同步消息后,leader更新HW(ISR中最小的LEO)
replica故障处理
follower故障
follower发生故障后,首先会被踢出ISR。
follower故障恢复之后,从哪里开始同步数据?
假设replica1发生故障后恢复。
replica1在同步数据之前,会将之前记录HW之后的数据剔除掉(6,7),然后再向leader同步5之后的消息。追上leader之后(30s),重新加入ISR。
leader故障
leader发生故障后,会在将ISR中的下个broker选举为leader。为了保证数据一致,其它的follower需要把高于HW的消息截取掉,然后其它副本同步数据。
注意:这种机制只能保证副本之间的数据一致性,并不能保证数据不丢失。
如何设置保证数据不丢失?
- 使用带有回调的方法:producer.send(msg,callback)。根据回调,一旦出现消息提交失败的情况,做针对性的处理。
- 在kafka服务端设置min.insync.replicas参数:这个值必须大于1,这个是要求一个leader至少感知到有至少一个follower还跟自己保持联系,没掉队,这样才能确保leader挂了还有一个follower。
- producer端设置acks=all:这个是要求每条数据,必须是写入所有replica之后,才能认为是写成功了。
- 设置unclean.leader.election.enable=false
- 配置三个以上的副本
- 消费端确保消息消费完成后再提交。consumer端有个参数enable.auto.commit,设置为false使用手动提交。
kafka为什么这么快?
- 顺序读写
- 索引
- 批量读写和文件压缩
- 零拷贝
zero copy
内核空间;用户空间
DMA(Direct Memory Access) 直接内存访问
传统I/O模型
传统I/O模型要先把数据从磁盘拷贝到内核缓冲区,然后拷贝到用户缓冲区,再拷贝到socket缓冲区,在拷贝到网卡设备。这里面发生了4次用户态和内存态的切换和4次数据拷贝,2次的系统调用(read、write),这个过程是非常费时间的。
zero copy I/O模型
kafka文件传输最终调用的是Java NIO库里的transferTo方法。