Kafka中的Controller

Controller是什么?有什么用?

Controller是Kafka集群中的一个Broker,主要负责管理集群中分区和副本的状态,比如某个分区的leader节点故障了,controller会负责选举出新的分区leader,并通知其他broker更新Isr列表;同样新增分区时也会让其他的broker感知到。

Controller是如何选举出来的?

集群中的每个Broker在启动时都会尝试去往ZK上创建一个/controller节点,成功创建节点的Broker就会被选举为Controller,而创建失败的Broker会监听这个临时节点,当选举为Controller的Broker宕机了,其他Broker会重新去竞争创建/controller节点,同样的,创建成功的节点会被选举为Controller。Broker成为Controller之后会监听ZK中的一些节点,比如/brokers/ids(监听broker变化),/brokers/topics(监听topic变化),/brokers/topics/[topic](监听topic的分区变化)

[zk: localhost:2181(CONNECTED) 1] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
[zk: localhost:2181(CONNECTED) 2] get -s /controller
{"version":1,"brokerid":2,"timestamp":"1619617224124"}
cZxid = 0x345
ctime = Wed Apr 28 21:40:24 CST 2021
mZxid = 0x345
mtime = Wed Apr 28 21:40:24 CST 2021
pZxid = 0x345
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1000418e0970022
dataLength = 54
numChildren = 0

分区Leader又是如何选举的?

正常情况

由于Controller监听了/brokers/ids节点,当有Broker宕机时,Controller会判断当前Broker上是否有分区leader,如果有,则会对该分区重新发起选举,选择Isr列表中的第一个Broker作为分区leader。

特殊情况

➜  kafka_2.12-2.6.2 bin/kafka-topics.sh --describe --zookeeper 192.168.1.161:2181 --topic multi-replicate-partition-topic
Topic: multi-replicate-partition-topic    PartitionCount: 2    ReplicationFactor: 3    Configs:
    Topic: multi-replicate-partition-topic    Partition: 0    Leader: 1    Replicas: 3,1,2    Isr: 1,3

broker.id为2那台机器由于没有完成同步备份导致没出现在Isr列表中时且1和3都宕机了,此时会根据unclean.leader.election.enable参数来决定选举策略,配置默认值false,表示只会在Isr中选择第一个Broker,因为Isr列表为空,选举阻塞且集群不可用;若设置为true,则会从Replicas中选举出2作为新leader,但是这种情况为了提供可用性,舍弃了数据的完整性。

消费者Rebalance机制

前面提过消费者的几种消费方式,当消费者在未指定分区消费时,才可能发生Rebalance,且Rebalance期间,消费者是无法继续消费消息的,因此要避免在高峰期发生Rebalance。

常见的Rebalance场景

  • 消费组中新增或减少了消费者个数
  • Topic增加了分区
  • 消费组订阅了其他的Topic

Rebalance策略

可通过partition.assignment.strategy消费者参数配置重分配策略,常见的策略有range、round-robin、sticky。假设topic分区数5(0-4)个,消费组有2个消费者。

  • range策略(默认策略)
    n=分区数/消费者数=2,m=分区数%消费者数=1
    则前m个消费者每个分配n+1个分区,后面的消费者每个分配n个分区,即上述案例的分配结果为[0,1,2]分配给第一个消费者,[3,4]分配给第二个消费者。
  • round-robin策略
    轮询分配,所以上述案例的分配结果为[0,2,4]分配给第一个消费者, [1, 3]分配给第二个消费者。
  • sticky策略
    与前两个策略不同的是,该策略在保证均匀分配的前提下,尽量保留之前的分配结果。比如消费者挂掉之后多出的分区会进行重分配,在均匀分配的前提下,已经分配好的分区是不会重新分配的。

Rebalance过程

根据消费组id可以计算出该消费组的消息提交到__consumer_offsets具体分区号,该分区所处的broker即为组协调器,消费者启动时,第一个成功连接到组协调器的消费者会被选为消费组协调器,通过消费组协调器制定的分区方案会通过组协调器分发给消费组内的其他消费者,消费者会根据分区方案进行消费消息。

什么是高水位(HW)

当一个分区存在多副本时,每个副本都会维护自己的LEO(消息文件末尾偏移量), HW就是ISR副本列表中最小的LEO,HW限制消费者只能消费到HW位置的消息,所以新消息写入时,只有等到ISR副本都同步完新消息后,HW也会更新,此时消费者才能消费到新消息。

Kakfa为什么性能那么高?

  • 顺序读写,生产者生产消息的时候都是以追加的形式写文件保证了磁盘顺序写;同时消息不能修改或删除保证了磁盘顺序读。
  • 传输数据时零拷贝
    非零拷贝场景: 磁盘文件 -> 内核空间 -> 用户空间 -> socket缓冲区 -> 网卡接口
    零拷贝场景: 磁盘文件 -> 内核空间 -> 网卡接口
    Kafka使用了mmap,mmap操作提供了一种机制,让用户程序直接访问设备内存,这种机制,相比较在用户空间和内核空间互相拷贝数据,效率更高。同时Kafka还使用了I/O高级函数sendfile,在通过DMA(Direct Memory Access,直接存储器访问)把硬盘数据拷贝到内核缓冲区后,DMA把数据从内核缓冲区直接拷贝给网卡接口。
  • 数据批量处理