Leader选举

利用zk选举

zk的三个特性:

  1. watch机制
  2. 节点不允许重复写入
  3. 使用临时节点

  在开始时所有的副本都去zk创建一个临时节点,先创建成功的副本就成为leader,后面的副本都watch这个临时节点,leader挂掉后就会触发watch事件,所有的副本开始重新选举一个leader。这样的实现比较简单,但是会存在一定的弊端。如果分区和副本数量过多,所有的副本都直接进行选举的话,一旦出现节点的增减,就会触发大量的watch事件,zk的负载就会过重。
  kafka早期的版本就是使用zk选举的。

谁来主持选举?

  1. Controller选举
    kafka先在brokers里面选一个broker作为Controller主持选举。
  2. Controller是怎么被选举出来的?
    Controller是使用zookeeper选举出来的,每个broker都往zk里面写一个/controller节点,谁先写成功,谁就成为Controller。如果controller失去连接,zk上的临时节点就会消失。其它的broker通过watcher监听到Controller下线的消息后,开始选举新的Controller。
  3. Controller的责任
  • 监听Broker的变化。
  • 监听Topic变化
  • 监听Partition变化
  • 获取和管理Broker、Topic、Partition的信息
  • 管理Partition的主从信息

选举

  1. 谁可以参加选举?
    AR:全部副本
    ISR:保持同步的副本
    OSR:未同步的副本
    AR=ISR+OSR
    所以选举从ISR中进行选举。如果ISR中没有副本,只能从OSR中选举一个作为Leader,但是OSR中副本的数据可能会存在数据丢失,所以这个功能是可以配置的,默认是打开的。配置项:unclean.leader.election.enable = true/false
  2. Controller是怎么进行Leader选举的?
      kafka没有使用Paxos衍生算法,它们的思想归纳起来就是:先到先得、少数服从多数。这类算法可能会出现脑裂(节点不能互通的时候,出现多个leader)、惊群效应(大量的watch事件被触发)。
      kafka使用算法最贴近微软的PacificA算法。是一种优先算法。在这种算法重,默认是让ISR中第一个replica成为leader。

主从同步

  leader确定之后,客户端的读写只能操作leader节点。follower需要向leader同步数据。

LEO和HW

  • LEO(Log End Offset):下一条等待写入消息的offset(最新的offset+1)。
  • HW(High Watermark):ISR中最小的LEO。leader会将ISR中最小的LEO作为HW,图中是HW=6。
      consumer最多只能消费到HW之前的位置(消费到offset 5的消息)。也就是说其它的副本没有同步的消息是不能被消费的。

从节点怎么跟主节点保持同步?

  1. follower节点会向leader发送一个fetch请求,leader向follower发送数据后,更新follower的LEO。
  2. 当follower同步消息后,leader更新HW(ISR中最小的LEO)

replica故障处理

follower故障

follower发生故障后,首先会被踢出ISR。
follower故障恢复之后,从哪里开始同步数据?
假设replica1发生故障后恢复。

replica1在同步数据之前,会将之前记录HW之后的数据剔除掉(6,7),然后再向leader同步5之后的消息。追上leader之后(30s),重新加入ISR。

leader故障

leader发生故障后,会在将ISR中的下个broker选举为leader。为了保证数据一致,其它的follower需要把高于HW的消息截取掉,然后其它副本同步数据。
注意:这种机制只能保证副本之间的数据一致性,并不能保证数据不丢失。
如何设置保证数据不丢失?

  1. 使用带有回调的方法:producer.send(msg,callback)。根据回调,一旦出现消息提交失败的情况,做针对性的处理。
  2. 在kafka服务端设置min.insync.replicas参数:这个值必须大于1,这个是要求一个leader至少感知到有至少一个follower还跟自己保持联系,没掉队,这样才能确保leader挂了还有一个follower。
  3. producer端设置acks=all:这个是要求每条数据,必须是写入所有replica之后,才能认为是写成功了。
  4. 设置unclean.leader.election.enable=false
  5. 配置三个以上的副本
  6. 消费端确保消息消费完成后再提交。consumer端有个参数enable.auto.commit,设置为false使用手动提交。

kafka为什么这么快?

  1. 顺序读写
  2. 索引
  3. 批量读写和文件压缩
  4. 零拷贝

zero copy

内核空间;用户空间
DMA(Direct Memory Access) 直接内存访问

传统I/O模型


  传统I/O模型要先把数据从磁盘拷贝到内核缓冲区,然后拷贝到用户缓冲区,再拷贝到socket缓冲区,在拷贝到网卡设备。这里面发生了4次用户态和内存态的切换和4次数据拷贝,2次的系统调用(read、write),这个过程是非常费时间的。

zero copy I/O模型


kafka文件传输最终调用的是Java NIO库里的transferTo方法。