• 为什么要使用 kafka?
  • kafka的数据可靠性怎么保证
  • Kafka的数据是放在磁盘上还是内存上,为什么速度会快?
  • 副本数据同步策略
  • 故障处理
  • kafka事务是怎么实现的
  • Kafka为什么不支持读写分离?
  • Kafka的数据是放在磁盘上还是内存上,为什么速度会快?
  • 总结

为什么要使用 kafka?

  1. 缓冲和削峰:上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,kafka在中间可以起到一个缓冲的作用,把消息暂存在kafka中,下游服务就可以按照自己的节奏进行慢慢处理。
  2. 解耦和扩展性:项目开始的时候,并不能确定具体需求。消息队列可以作为一个接口层,解耦重要的业务流程。只需要遵守约定,针对数据编程即可获取扩展能力。
  3. 冗余:可以采用一对多的方式,一个生产者发布消息,可以被多个订阅topic的服务消费到,供多个毫无关联的业务使用。
  4. 健壮性:消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会影响主要业务的正常进行。
  5. 异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

 

kafka的数据可靠性怎么保证

为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到producer发送的数据后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。所以引出ack机制。 ack应答机制 Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。acks参数配置:

  • 0:producer不等待broker的ack,这一操作提供了一个最低的延迟,broker一接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据。
  • 1:producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据。

  • -1(all):producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack。但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复。

Kafka的数据是放在磁盘上还是内存上,为什么速度会快?

kafka使用的是磁盘存储。速度快是因为:顺序写入:因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是耗时的。所以硬盘 “讨厌”随机I/O, 喜欢顺序I/O。为了提高读写硬盘的速度,Kafka就是使用顺序I/O。Memory Mapped Files(内存映射文件):64位操作系统中一般可以表示20G的数据文件,它的工作原理是直接利用操作系统的Page来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上。

Kafka高效文件存储设计: Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用 。通过索引信息可以快速定位 message和确定response的 大 小。通过index元数据全部映射到memory(内存映射文件), 可以避免segment file的IO磁盘操作。通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。

注:Kafka解决查询效率的手段之一是将数据文件分段,比如有100条Message,它们的offset是从0到99。假设将数据文件分成5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中 小的offset命名。这样在查找指定offset的Message的时候,用二分查找就可以定位到该Message在哪个段中。为数据文件建 索引数据文件分段 使得可以在一个较小的数据文件中查找对应offset的Message 了,但是这依然需要顺序扫描才能找到对应offset的Message。

为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。

副本数据同步策略

选择最后一个的原因:

  1. 同样为了容忍n台节点的故障,第一种方案需要2n+1个副本,而第二种方案只需要n+1个副本,而Kafka的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
  2. 虽然第二种方案的网络延迟会比较高,但网络延迟对Kafka的影响较小。

ISR如果采用全部完成同步,才发送ack的副本的同步策略的话:提出问题:leader收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader进行同步,那leader就要一直等下去,直到它完成同步,才能发送ack。这个问题怎么解决呢?Leader维护了一个动态的in-sync replica set (ISR),意为和leader保持同步的follower集合。当ISR中的follower完成数据的同步之后,leader就会给follower发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。Leader发生故障之后,就会从ISR中选举新的leader。

故障处理

LEO:指的是每个副本最大的offset。HW:指的是消费者能见到的最大的offset,ISR队列中最小的LEO。 kafka的消费分区分配策略 一个consumer group中有多个consumer,一个topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消费 Kafka有三种分配策略,**一是RoundRobin,一是Range。高版本还有一个StickyAssignor策略 **

将分区的所有权从一个消费者移到另一个消费者称为重新平衡(rebalance)。当以下事件发生时,Kafka 将会进行一次分区分配:同一个 Consumer Group 内新增消费者。消费者离开当前所属的Consumer Group,包括shuts down或crashes。

  • Range分区分配策略

Range是对每个Topic而言的(即一个Topic一个Topic分),首先对同一个Topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。然后用Partitions分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。假设n=分区数/消费者数量,m=分区数%消费者数量,那么前m个消费者每个分配n+1个分区,后面的(消费者数量-m)个消费者每个分配n个分区。

假如有10个分区,3个消费者线程,把分区按照序号排列0,1,2,3,4,5,6,7,8,9消费者线程为C1-0,C2-0,C2-1那么用partition数除以消费者线程的总数来决定每个消费者线程消费几个partition,如果除不尽,前面几个消费者将会多消费一个分区。

在我们的例子里面,我们有10个分区,3个消费者线程,10/3 = 3,而且除除不尽,那么消费者线程C1-0将会多消费一个分区,所以最后分区分配的结果看起来是这样的:

C1-0:0,1,2,3

C2-0:4,5,6

C2-1:7,8,9

如果有11个分区将会是:

C1-0:0,1,2,3

C2-0:4,5,6,7

C2-1:8,9,10

假如我们有两个主题T1,T2,分别有10个分区,最后的分配结果将会是这样:

C1-0:T1(0,1,2,3) T2(0,1,2,3)

C2-0:T1(4,5,6) T2(4,5,6)

C2-1:T1(7,8,9) T2(7,8,9)

  • RoundRobinAssignor分区分配策略

RoundRobinAssignor策略的原理是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序,然后通过轮询方式逐个将分区以此分配给每个消费者. 使用RoundRobin策略有两个前提条件必须满足:同一个消费者组里面的所有消费者的num.streams(消费者消费线程数)必须相等;每个消费者订阅的主题必须相同。

加入按照 hashCode 排序完的topic-partitions组依次为

T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9

我们的消费者线程排序为C1-0, C1-1, C2-0, C2-1

最后分区分配的结果为:

分区C1-0 将消费 T1-5, T1-2, T1-6

分区C1-1 将消费 T1-3, T1-1, T1-9

分区C2-0 将消费 T1-0, T1-4

分区C2-1 将消费 T1-8, T1-7

  • StickyAssignor分区分配策略

Kafka从0.11.x版本开始引入这种分配策略,它主要有两个目的:分区的分配要尽可能的均匀,分配给消费者者的主题分区数最多相差一个 分区的分配尽可能的与上次分配的保持相同。当两者发生冲突时,第一个目标优先于第二个目标。

鉴于这两个目的, StickyAssignor策略的具体实现要比RangeAssignor和RoundRobinAssignor这两种分配策略要复杂很多。

假设消费组内有3个消费者C0、C1、C2,它们都订阅了4个主题:t0、t1、t2、t3并且每个主题有2个分区,也就是说整个消费组订阅了t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1

这8个分区最终的分配结果如下:

消费者C0:t0p0、t1p1、t3p0

消费者C1:t0p1、t2p0、t3p1

消费者C2:t1p0、t2p1

这样初看上去似乎与采用RoundRobinAssignor策略所分配的结果相同此时假设消费者C1脱离了消费组,那么消费组就会执行再平衡操作,进而消费分区会重新分配。

如果采用 RoundRobinAssignor策略 ,那么此时的分配结果如下:

消费者C0:t0p0、t1p0、t2p0、t3p0

消费者C2:t0p1、t1p1、t2p1、t3p1

如分配结果所示,RoundRobinAssignor策略会按照消费者C0和C2进行重新轮询分配。

而如果此时使用的是 StickyAssignor策略 ,那么分配结果为:

消费者C0:t0p0、t1p1、t3p0、t2p0

消费者C2:t1p0、t2p1、t0p1、t3p1

可以看到分配结果中保留了上一次分配中对于消费者C0和C2的所有分配结果,并将原来消费者C1的“负担”分配给了剩余的两个消费者C0和C2,最终C0和C2的分配还保持了均衡。如果发生分区重分配,那么对于同一个分区而言有可能之前的消费者和新指派的消费者不是同一个,对于之前消费者进行到一半的处理还要在新指派的消费者中再次复现一遍,这显然很浪费系统资源。 StickyAssignor策略如同其名称中的“sticky”一样,让分配策略具备一定的“粘性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗以及其它异常情况的发生。

到目前为止所分析的都是消费者的订阅信息都是相同的情况,我们来看一下订阅信息不同的情况下的处理。

举例,同样消费组内有3个消费者:C0、C1、C2集群中有3个主题:t0、t1、t2

这3个主题分别有1、2、3个分区也就是说集群中有t0p0、t1p0、t1p1、t2p0、t2p1、t2p2这6个分区

消费者C0订阅了主题t0

消费者C1订阅了主题t0和t1

消费者C2订阅了主题t0、t1和t2

如果此时采用RoundRobinAssignor策略:

消费者C0:t0p0

消费者C1:t1p0

消费者C2:t1p1、t2p0、t2p1、t2p2

如果此时采用的是StickyAssignor策略:

消费者C0:t0p0

消费者C1:t1p0、t1p1

消费者C2:t2p0、t2p1、t2p2

此时消费者C0脱离了消费组,那么RoundRobinAssignor策略的分配结果为

消费者C1:t0p0、t1p1

消费者C2:t1p0、t2p0、t2p1、t2p2

StickyAssignor策略,那么分配结果为:

消费者C1:t1p0、t1p1、t0p0

消费者C2:t2p0、t2p1、t2p2

可以看到StickyAssignor策略保留了消费者C1和C2中原有的5个分区的分配:t1p0、t1p1、t2p0、t2p1、t2p2。

从结果上看StickyAssignor策略比另外两者分配策略而言显得更加的优异,这个策略的代码实现也是异常复杂。

kafka事务是怎么实现的

Kafka从0.11版本开始引入了事务支持。事务可以保证Kafka在Exactly Once语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。

  • Producer事务

为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID,并将Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的Transaction ID获得原来的PID。

为了管理Transaction,Kafka引入了一个新的组件Transaction Coordinator。Producer就是通过和Transaction Coordinator交互获得Transaction ID对应的任务状态。Transaction Coordinator还负责将事务所有写入Kafka的一个内部Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。

  • Consumer事务

对于Consumer而言,事务的保证就会相对较弱,尤其是无法保证Commit的信息被精确消费。这是由于Consumer可以通过offset访问任意信息,而且不同的Segment File生命周期不同,同一事务的消息可能会出现重启后被删除的情况。Exactly Once语义 将服务器的ACK级别设置为-1,可以保证Producer到Server之间不会丢失数据,即At Least Once语义。相对的,将服务器ACK级别设置为0,可以保证生产者每条消息只会被发送一次,即At Most Once语义。

在0.11版本以前的Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。

对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。0.11版本的Kafka,引入了一项重大特性:幂等性。开启幂等性enable.idempotence=true。所谓的幂等性就是指Producer不论向Server发送多少次重复数据,Server端都只会持久化一条。幂等性结合At Least Once语义,就构成了Kafka的Exactly Once语义。即:At Least Once + 幂等性 = Exactly OnceKafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一Partition的消息会附带Sequence Number。而Broker端会对< PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker只会持久化一条。但是PID重启就会变化,同时不同的Partition也具有不同主键,所以幂等性无法保证跨分区跨会话的Exactly Once。补充,在流式计算中怎么Exactly Once语义?以flink为例

  • souce:使用执行ExactlyOnce的数据源,比如kafka等

内部使用FlinkKafakConsumer,并开启CheckPoint,偏移量会保存到StateBackend中,并且默认会将偏移量写入到topic中去,即 _ consumer_offsets Flink设置
CheckepointingModel.EXACTLY_ONCE

Kafka为什么不支持读写分离?

在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从 而实现的是一种主写主读的生产消费模型。Kafka 并不支持主写从读,因为主写从读有 2 个很明显的缺点:数据一致性问题:数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间 窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中 A 数据的值都为 X, 之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。

延时问题:类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经历 网络→主节点内存→网络→从节点内存 这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历 网络→主节点内存→主节点磁盘→网络→从节 点内存→从节点磁盘 这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。

Kafka的数据是放在磁盘上还是内存上,为什么速度会快?

Kafka使用的是磁盘存储。速度快是因为:顺序写入。因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是耗时的。所以硬盘 “讨厌”随机I/O, 喜欢顺序I/O。为了提高读写硬盘的速度,Kafka就是使用顺序I/O。Memory Mapped Files(内存映射文件):64位操作系统中一般可以表示20G的数据文件,它的工作原理是直接利用操作系统的Page来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上。

Kafka高效文件存储设计:Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。

通过索引信息可以快速定位message和确定response的 大 小。通过index元数据全部映射到memory(内存映射文件),可以避免segment file的IO磁盘操作。通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。

总结

Kafka解决查询效率的手段之一是将数据文件分段,比如有100条Message,它们的offset是从0到99。假设将数据文件分成5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中 小的offset命名。这样在查找指定offset的 Message的时候,用二分查找就可以定位到该Message在哪个段中。为数据文件建 索引数据文件分段 使得可以在一个较小的数据文件中查找对应offset的Message 了,但是这依然需要顺序扫描才能找到对应offset的Message。为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。

原文链接:
https://mp.weixin.qq.com/s?__biz=Mzg2ODU1MDkwMw==&mid=2247486303&idx=1&sn=271ce974d598b890207218a136d60089&utm_source=tuicool&utm_medium=referral