发送消息的幂等性
Broker有判断producer生产消息幂等性的功能:
具体设置:
enable.idempotence=true/false
原理
- PID(Producer ID)
- sequence number
生产者都要有一个唯一的编号,就是PID。每一条消息都要有一个sequence number,如果消息的sequence number小于服务端存储的最大编号,则判定该消息为重复消息。
kafka只保证单个partition的消息具有顺序性,并不保证整个topic的消息具有顺序性。
kafka的事务
什么时候需要事务?
- 发送多条消息:例如在分布式微服务系统中的,我们添加增加一条数据可能需要给多个服务发送消息,这个时候就要用到事务来保证整个发送动作的原子性。
- 发送消息到多个topic或者多个partition。一条消息需要发送到多个地方,也需要保证原子性。
- 消费以后发出消息。就是在消费一条消息后发出一条消息,发送消息失败则消费消息也失败,consume-process-produce。
事务的使用
//初始化事务
producer.initTransactions();
try{
//开启一个事务
producer.beginTransaction();
//发送消息逻辑 ...
//提交事务
producer.commitTransaction();
}catch (KafkaException e){
//终止事务
producer.abortTransaction();
}
事务实现原理
- 2PC
- Transaction Coordinator(事务协调者)
- 事务日志:topic_transaction_state
- 生产者事务ID:transaction.id
A:生产者通过initTransactions API向Coordinator注册事务ID。
B:Coordinator记录事务日志。
C:生产者把消息写入目标分区。
D:分区和Coordinator的交互。当事务完成以后,消息的状态应该是已提交,这样消费者才可以消费到。
Producer发送消息流程及原理
kafka发送消息主要经过下图流程:
消息先经过拦截器进行预处理,经过序列化器序列化,然后由分区器指定分区,后面追加到累加器。当累加器内消息达到设定的阈值后就会触发Sender线程发送消息。
我们先看下发送消息的代码:
KafkaProducer<String, String> producer = new KafkaProducer<>(pros);
producer.send(new ProducerRecord<>("topic","key","value"));
拦截器
看其中的send()方法:
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
可以看到在发送消息前会执行interceptors方法onSend()。这是个拦截器链,可以在消息发送之前做一些定制化的操作。拦截器我们可以自己实现:
public class LogInterceptor implements ProducerInterceptor {
@Override
public ProducerRecord onSend(ProducerRecord record) {
System.out.println("发送了一条消息:"+record.value());
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
System.out.println("服务端接收到消息1");
}
@Override
public void close() {
System.out.println("拦截器关闭");
}
@Override
public void configure(Map<String, ?> configs) {
}
}
我们简单实现了一个发送消息打印出消息内容的拦截器,然后将拦截器加入到producer中:
List<String> interceptors = new ArrayList<>(1);
interceptors.add("com.example.demo.LogInterceptor");
pros.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);
producer.send(new ProducerRecord<>("topic","key","key"));
这样在发送消息的时候就会经过拦截器拦截。
序列化
进入doSend()方法:
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
可以看到对消息key和value进行了序列化。我们也可以使用一些其它的序列化工具,例如:ProtoBuf。
序列化配置:
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
分区选择
在doSend()方法中,有以下代码:
int partition = partition(record, serializedKey, serializedValue, cluster);
partition()方法主要是使用分区器(Partitioner)指定消息的目标分区。
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
//获取消息指定的分区
Integer partition = record.partition();
//如果指定了则发送到指定的分区,如果未指定则使用partitioner进行选择分区
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
可以看到Partitioner有三个实现类:
- DefaultPartitioner:默认分区器,根据key进行hash计算指定到分区。
- RoundRobinPartitioner:轮询分区器
- UniformStickyPartitioner:粘滞分区器
自定义分区器
我们可以自定义分区器,实现Partitioner后,配置分区器:
//可以指定kafka分区器和自定义分区器,没有指定分区器就会使用默认的分区器。
props.put("partitioner.class", "com.hj.kafka.producer.MyParatitioner");
批量发送消息
result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
消息并不会立即发送,而是追加到累加器中,到达一定数量或时间后才会一起发送消息。
累加器RecordAccumulator使用了ConcurrentHashMap来存储消息:
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
判断累加器是否写满或到时间,达到阈值则批量发送消息:
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
//线程发送
this.sender.wakeup();
}
在Producer config中可以进行配置:
//多少数据发送一次,默认16k
pros.put("batch.size",16384);
//批量发送的等待时间
pros.put("linger.ms",5);
发送消息服务端响应设置
副本消息同步
当Producer将消息发送到分区主节点时,主分区需要发送ack信号给producer来确认消息接收完成。但是当kakfa有多个broker时,就需要考虑副本是不是同步完成。如下图:
kafka副本同步数据有三种设置:
min.insync.replicas=0/1/-1
- Ack = 0:Producer不等待副本同步完消息后Learder发送ack信号,消息发送完毕直接进行下一轮发送。所以该配置情况下,集群可靠性最低,延迟性也是最低,Broker在未接收到消息时宕机了就会消息丢失。
- Ack = 1:Leader收到消息后就发送ack信号到Producer,并不等待所有副本同步完成。当follower未同步消息时Leader挂掉,就会造成消息丢失。
- Ack = -1:Leader接收到消息后等收到所有follower同步完成的ack信号后,再发送ack信号给Producer。这种情况下可靠性最高,单延迟性也是最高的。
哪些节点参与同步?
Leader需要等待所有副本同步完毕才返回Ack信号,这就意味着若一个follower失去与leader的连接时,就会造成leader接收不到ack信号,导致消息接收失败。针对解决这个问题,kafka使用了ISR来做响应。
- ISR:in-sync replica set。与leader保持同步的节点。
配置replica.lag.time.max.ms:如果一个follower在这个时间内没有发送fetch请求或消费leader日志到结束的offset,leader将从ISR中移除这个follower,并认为这个follower已经挂了。
副本分配及消息存储
kafka中副本的数量一定要小于等于机器的节点数,因为当副本数>机器节点数时,肯定有个机器上有多个副本,当这个机器挂掉后,这个机器上的副本都不可用。
副本分配规则
当设置副本数小于机器节点数的时候,肯定需要考虑所有分区在各个副本上怎样分配才均匀。kafka使用了以下方式来进行副本分配:
- 副本因子不能大于Broker的个数。
- 第一个分区(编号为0)的第一个副本放置位置是随机从BrokerList选择的。
- 其它分区的第一个副本的放置位置是相对于0分区的第一个分区位置依次后移。
- 剩下的副本随机分配。
例如:3个Broker,4个分区,2个副本。红色的都是leader节点。
为什么要将leader节点错开分布?
防止broker挂掉导致触发多个leader选举。
消息存储位置
kafka会将消息存储在一个log文件中:
- .index:消息的offset索引文件
- .log:存储消息的文件
- .timeindex:消息的时间索引文件
segment
一个分区内的消息并不一直存储在一个log文件中。当数据量越来越多时会才分为segment存储,每个segment都配两个索引文件,是配套存在的:
配置每个segment大小和轮转时间:
索引文件也可以设置大小,满了后就会创建一套文件:
注意kafka的索引并不是每一条消息都会建立索引,而是一种稀疏索引sparse index。这个索引到底有多稀疏?也就是说隔几条才产生一个索引记录?
- 实际上是用消息的大小来控制的,默认是4kb:
思考:为什么kafka不用B+ Tree?
kafka是大数据量写入,如果kafka使用B+Tree作为索引,会在写入的时候重建索引,树的调整是非常耗性能的。
索引查找过程
- 根据offset判断在哪个segment中。
- 在segment的indexfile中,根据offset查找怕position。
- 根据position从log文件中找到消息。
消费记录
kakfa会创建50个__consumer_offset topic用来存储消费者的消费offset:
消息保留(清理)机制
开关与策略
清理策略有两种:
- delete:直接删除。定时任务实现,默认5分钟执行一次。
- compact:将key相同的进行整理,保留最新的一条。

京公网安备 11010502036488号