1.为什么要使用Kafka?(为什么要使用消息队列)
(1) 缓冲与削峰
上游数据有时会有突发流量,下游可能抗不住,或者下游没有足够多的机器来保证冗余,kafka在中间可以起到一个缓冲的作用,把消息暂存在kafka中,下游服务就可以按照自己的节奏进行慢慢处理。
(2)解耦与扩展性
项目开始的时候,并不能确定具体需求。消息队列可以作为一个接口层,解耦重要的业务流程。只需要遵守约定,针对数据编程即可获取扩展能力。
(3)冗余
可以采用一对多的方式,一个生产者发布消息,可以被多个订阅topic的服务消费到,供多个毫无关联的业务使用。
(4)健壮性
消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会影响主要的业务的正常进行。
(5)异步通信
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
2.Kafka的架构组成
消息队列的两种方式:
①点对点消息:一对一,生产者将消息发送到队列中,消费者主动拉取数据,消息收到后消息清除。
②发布-订阅消息: 一对多,消费者消费数据之后不会清除消息。发布/订阅模式有两种:
①队列推送数据(消费者能力不平均)
②消费者拉取消息(要维护一个长轮询)
kafka属于消费者主动拉取数据的模式。
Kafka是一个具有分区机制、副本机制和容错机制的分布式消息系统。
一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
(1)Broker
一个服务器就是一个broker,broker是无状态的(因为offset由consumer控制,所以kafka的broker是无状态的,它不需要标记哪些消息被哪些消费过。),broker使用zookeeper维护集群的状态。Broker的主要工作就是接收生产者发过来的消息,分配offset,之后保存到磁盘中。同时,接收消费者、其他Broker的请求,根据请求类型进行相应处理并返回响应。
(2)Zookeeper
Zookeeper负责维护和协调broker。当Kafka系统中新增了broker或者某个broker发生故障失效时,由Zookeeper通知生产者和消费者。生产者和消费者依据Zookeeper的broker状态信息与broker协调数据的发布与订阅任务。
(3)生产者
生产者将数据推送到broker上,当集群中出现新的broker时,所有的生产者将会搜寻到这个新的broker,并自动将数据发送到这个broker上。
(4)消费者
因为Kafka的broker是无状态的,所以consumer必须使用partition offset来记录消费了多少数据。如果一个consumer指定了topic的offset,意味着该consumer已经消费了该offset之前的所有数据。consumer可以指定offset,从topic的指定位置开始消费数据。consumer的offset存储在__consumeroffsets topic 中(不再是zk中,因为zk不适合大批量的频繁读写操作)。(元数据是以 topic 的形式存储在 kafka 中的,topic 名为:__consumer_offsets。并且也提供 了 kafka_consumer_groups.sh 脚本来帮助我们查看偏移量情况)
(5)Leader
每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。
(6)Follower
Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,Leader会把这个Follower从"in sync replicas"(ISR)列表中删除,重新创建一个Follower。
(7)消费者组
consumer group是Kafka提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或者消费者实例,它们共享一个公共的ID,即group ID。组内的所有消费者协调在一起来消费订阅主题的所有分区。当然,每个分区只能由同一个消费组内的一个consumer来消费。
(8)Offset机制
每个consumer group保存自己的位移信息,那么只需要简单的一个整数表示位置就够了;同时可以引入checkpoint机制定期持久化,简化了应答机制的实现。增加 __consumeroffsets topic,将offset信息写入这个topic,摆脱对zookeeper的依赖。 __consumer_offsets中的消息保存了每个consumer group某一个时刻提交的offset信息。(Kafka内会保存着每个消费者组的在某个一个主题上的消费的offset)
3.消费者与Partition
3.1 一个消费者组
3.1.1 消费者数量小于分区数量
只有一个消费者时,消费者1将收到4个分区的全部消息。
当有两个消费者时,每个消费者将分别从两个分区接受消息。
3.1.2 消费者数量等于分区数量
当有四个消费者时,每个消费者都可以接受一个分区的消息。
3.1.3 消费者数量大于于分区数量
当有五个消费者时,会有闲置的消费者。
3.2 两个消费者组
消费者群组之间是互不影响的,如图:
4.Kafka为什么吞吐量大、速度快?
(1)顺序读写
Kafka是将消息记录持久化到本地磁盘中的,读写速度的快或慢关键在于寻址的方式。磁盘分为顺序读写与随机读写,内存也一样分为顺序读写与随机读写。基于磁盘的随机读写确实很慢,但磁盘的顺序读写性能却很高,一般而言要高出磁盘随机读写三个数量级,一些情况下磁盘顺序读写性能甚至要高于内存随机读写。Kafka就是使用了磁盘顺序读写来提升的性能。Kafka的message是不断追加到本地磁盘文件末尾的,而不是随机的写入。
每一个Partition其实都是一个文件 ,收到消息后Kafka会把数据插入到文件末尾(虚框部分)。
而且,kafka会为每一个consumer group保留一些元信息,记录当前消息的position也就是offset。正常情况下Consumer会在消费完一条消息会递增该offset。当然如果把consumer的offset重新设置一个较小的值,就可以重新消费一些信息。因为offset由consumer控制,所以kafka的broker是无状态的,它不需要标记哪些消息被哪些消费过,也不需要通过broker去保证同一个consumer group只有一个consumer能够消费某一条消息,因此也就不需要锁机制,这也是为kafka的高吞吐量提供了有力保证。
(2)Page Cache(页缓存)
为了优化读写性能,Kafka利用了操作系统本身的Page Cache,就是利用操作系统自身的内存而不是JVM空间内存。这样做的好处有:
1.避免Object消耗:如果是使用 Java 堆,Java对象的内存消耗比较大,通常是所存储数据的两倍甚至更多。 2.避免GC问题:随着JVM中数据不断增多,垃圾回收将会变得复杂与缓慢,使用系统缓存就不会存在GC问题
相比于使用JVM或in-memory cache等数据结构,利用操作系统的Page Cache更加简单可靠。首先,操作系统层面的缓存利用率会更高,因为存储的都是紧凑的字节结构而不是独立的对象。其次,操作系统本身也对于Page Cache做了大量优化,提供了 write-behind、read-ahead以及flush等多种机制。再者,即使服务进程重启,系统缓存依然不会消失,避免了in-process cache重建缓存的过程。
通过操作系统的Page Cache,Kafka的读写操作基本上是基于内存的,读写速度得到了极大的提升。
(3)零拷贝
linux操作系统 “零拷贝” 机制使用了sendfile方法, 允许操作系统将数据从Page Cache 直接发送到网络,只需要最后一步的copy操作将数据复制到 NIC 缓冲区, 这样避免重新复制数据 。示意图如下:
通过这种 “零拷贝” 的机制,Page Cache 结合 sendfile 方法,Kafka消费端的性能也大幅提升。这也是为什么有时候消费端在不断消费数据时,我们并没有看到磁盘io比较高,此刻正是操作系统缓存在提供数据。
当Kafka客户端从服务器读取数据时,如果不使用零拷贝技术,那么大致需要经历这样的一个过程:
1.操作系统将数据从磁盘上读入到内核空间的读缓冲区中。 2.应用程序(也就是Kafka)从内核空间的读缓冲区将数据拷贝到用户空间的缓冲区中。 3.应用程序将数据从用户空间的缓冲区再写回到内核空间的socket缓冲区中。 4.操作系统将socket缓冲区中的数据拷贝到NIC缓冲区中,然后通过网络发送给客户端。
(4)分区分段+索引
Kafka的message是按topic分类存储的,topic中的数据又是按照一个一个的partition即分区存储到不同broker节点。每个partition对应了操作系统上的一个文件夹,partition实际上又是按照segment分段存储的。这也非常符合分布式系统分区分桶的设计思想。
通过这种分区分段的设计,Kafka的message消息实际上是分布式存储在一个一个小的segment中的,每次文件操作也是直接操作的segment。为了进一步的查询优化,Kafka又默认为分段后的数据文件建立了索引文件,就是文件系统上的.index文件。这种分区分段+索引的设计,不仅提升了数据读取的效率,同时也提高了数据操作的并行度。
(5)批量读写
Kafka数据读写也是批量的而不是单条的。
除了利用底层的技术外,Kafka还在应用程序层面提供了一些手段来提升性能。最明显的就是使用批次。在向Kafka写入数据时,可以启用批次写入,这样可以避免在网络上频繁传输单个消息带来的延迟和带宽开销。假设网络带宽为10MB/S,一次性传输10MB的消息比传输1KB的消息10000万次显然要快得多。
(6)批量压缩
在很多情况下,系统的瓶颈不是CPU或磁盘,而是网络IO,对于需要在广域网上的数据中心之间发送消息的数据流水线尤其如此。进行数据压缩会消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑。
如果每个消息都压缩,但是压缩率相对很低,所以Kafka使用了批量压缩,即将多个消息一起压缩而不是单个消息压缩
Kafka允许使用递归的消息集合,批量的消息可以通过压缩的形式传输并且在日志中也可以保持压缩格式,直到被消费者解压缩.
Kafka支持多种压缩协议,包括Gzip和Snappy压缩协议.
Kafka速度的秘诀在于,它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络IO损耗,通过mmap提高I/O速度,写入数据的时候由于单个Partion是末尾添加所以速度最优;读取数据的时候配合sendfile直接暴力输出。
5.Kafka中Zookeeper的作用有哪些?
(1)Broker注册
每个broker在启动时,都会到Zookeeper上进行注册,即到 /broker/ids 下创建属于自己的节点。Kafka使用全局唯一的数字来指代每个Broker服务器,不同的broker必须使用不同的broker ID进行注册,创建完节点后,每个broker就会将自己的IP地址和端口信息记录到该节点中去。其中,Broker创建的节点类型时是临时节点,一旦broker宕机,则对应的临时节点也会被自动删除。
(2)Topic注册
在Kafka中,同一个Topic的消息会被分成多个分区并将其分布在多个broker上,这些分区信息及与broker的对应关系也都是由zookeeper在维护,由专门的节点来记录。如:/brokers/topics。
broker服务器启动后,会到对应topic节点上注册自己的broker ID并写入针对该topic的分区总数,这个分区的节点也是临时节点
(3)生产者负载均衡
由于同一个topic的消息会被分区并将其分布在多个Broker上,因此生产者需要将消息合理地发送到这些分布式的broker上,kafka支持Zookeeper方式实现负载均衡。由于每个broker启动时,都会完成broker注册过程,生产者会通过该节点的变化来动态地感知到broker服务器列表的变更,这样就可以实现动态的负载均衡机制。
(4)消费者负载均衡
kafka需要进行负载均衡来实现多个消费者合理地从对应的broker服务器上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不同的消费者分组消费自己特定的topic下面的消息,互不干扰。
(5)分区与消费者的关系
在kafka中,规定了每个消费分区只能被同组的一个消费者进行消费,因此,需要在zookeeper上记录 消息分区与consumer之间的关系,每个消费者一旦确定了对一个消息分区的权利,需要将其consumer ID写入到Zookeeper对应消息分区的临时节点上。
6.Kafka如何保证消息的有序性?
通过分区的概念,Kafka可以在多个消费者组并发的情况下提供较好的有序性和负载均衡。将每个分区只分发给一个消费者组,这样一个分区就只被这个组的一个消费者消费,就可以顺序的消费这个分区的消息。因为有多个分区,依然可以在多个消费者组之间进行负载均衡。
Kafka只能保证一个分区之内消息的有序性,在不同的分区之间是不可以的,这已经可以满足大部分应用的需求。如果需要topic中所有消息的有序性,那就只能让这个topic只有一个分区,当然也就只有一个消费者组消费它。
7.Kafka消息的存储机制
Kafka通过topic来分主题放数据,主题内有分区,分区可以有多个副本,分区的内部还细分为若干个segment。都是持久化到磁盘,采用零拷贝技术。
8.如何解决消息丢失和消息重复?
要确定Kafka的消息是否丢失或者重复,从两个方面分析入手:消息发送和消息消费。
消息发送
Kafka消息发送有两种方式:同步和异步,默认是同步方式。
①同步:这个生产者写一条消息的时候,它就立马发送到某个分区去。
②异步:这个生产者写一条消息的时候,先写到某个缓冲区,这个缓冲区里的数据还没写到broker集群里的某个分区的时候,它就返回到client去了。
可通过produce.type属性进行配置。Kafka通过配置request.requireed.acks属性来确认消息的生产:
0 表示不进行消息接收是否成功的确认;
1 表示当Leader接收成功时确认;
-1 表示Leader和Follower都接收成功时确认;
(1)当 acks=0 时,不和Kafka集群进行消息接收确认,则当网络异常、缓冲区满了等情况时,消息可能丢失;
(2)当 ack=1 时,在同步模式下,只有Leader确认接收成功后但挂掉了,副本没有同步,数据可能丢失。
消息消费
Kafka消息消费有两个consumer接口,Low-level API 和 High-level API:
Low-level API:消费者自己维护offset等值,可以实现对Kafka的完全控制;
High-level API:封装了对partition 和 offset 的管理,使用简单;
如果使用高级接口High-level API,可能存在一个问题就是当消息消费者从集群中把消息取出来、并提交了新的消息offset值后,还没来得及消费就挂掉了,那么下次再消费时之前没消费成功的消息就会消失。
解决如下:
①针对消息丢失:同步模式下,确认机制设置为-1,即让消息写入Leader和Follower之后再确认消息发送成功;异步模式下,为防止缓冲区满,可以在配置文件设置不限制阻塞超时时间,当缓冲区满时让生产者一直处于阻塞状态;
②针对消息重复:将消息的唯一标识保存到外部介质中,每次消费时判断是否处理过即可。