Kafka配置
生产者常见配置项
acks(默认值1)
- acks=0 生产者不会等待broker的任何回复,这种情况下不能保证消息被broker接收。
- acks=1 生产者只会等待leader节点将数据写入到本地log,不关心follower是否成功同步到消息。若此时leader发生故障且没有将新消息同步给follower,新消息会丢失。
- acks=-1 or acks=all
生产者需要等待leader同步消息给相应数量的follower,该数量由
min.insync.replicas
配置(默认为1,推荐配置大于等于2)决定。
retries
设置失败重试次数,失败重试一定程度上可以保证消息发送的可靠性,也有可能出现消息重复发送(如网络抖动,生产者未收到broker的响应,实际消息已成功发送),此时需要注意消息的幂等性,避免重复消费。
retry.backoff.ms
重试间隔时间,默认100ms
buffer.memory
设置本地缓冲区大小,默认值33554432字节(32M)。生产者发送消息时,会先将消息发送到本地缓冲区,再由本地线程从缓冲区读取数据批量发送给broker。
batch.size
批量发送消息的大小,默认值16384字节(16K)。当从缓冲区读取数据到达16k时即发送。
linger.ms
消息徘徊时间,默认值0,表示消息必须立即发送。若超过徘徊时间还未能从缓冲区读取到16k的数据,这些数据也必须发送。
消费者常见配置项
enable.auto.commit
设置是否自动提交,默认值true。消费者拉取到消息后自动提交偏移量。
auto.commit.interval.ms
自动提交间隔,默认值5000,表示自动提交任务每隔5秒执行一次。
自动提交可能引起的问题:
- 重复消费:若读取到消息后,业务代码的执行小于自动提交间隔时间,若在业务逻辑已处理完成后,还未提交偏移量,此时消费者故障重启后,还会拉取到故障时间点之前的历史数据,也就是重复消费问题。
- 丢消息: 若读取到消息后,业务代码的执行时间大于自动提交间隔时间且过了自动提交时间点,消息还在逐条处理,此时消费端故障重启后就拉取不到故障时间点之后的历史消息了,也就是消息丢失了。
auto.offset.reset
设置新加入的消费组从哪开始消费。
- latest 默认值,只会消费启动后的新消息
- earliest 启动时从头消费
max.poll.records
设置消费者一次最多拉取的消息个数。默认值500
max.poll.interval.ms
若消费者poll间隔时间超过该值,broker会认为消费者处理能力弱并将消费者移出消费组,分区数据分配给别的消费者消费。默认值300000(5分钟)
session.timeout.ms
设置超时时间,消费者超过该时间后还未发送心跳,broker也会将消费者移出消费组,分区数据分配给别的消费者消费,默认值10000(10秒)。
heartbeat.interval.ms
设置发送心跳的间隔时间,默认值3000(3秒)
消费者常见消费方式
指定主题消费
public class Consumer {
private final static String TOPIC_NAME = "multi-replicate-partition-topic";
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.161:9092,192.168.1.161:9093,192.168.1.161:9094");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
if (records.count() > 0) {
// 手动同步提交
consumer.commitSync();
}
}
}
}
消费指定分区
public class Consumer {
private final static String TOPIC_NAME = "multi-replicate-partition-topic";
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.161:9092,192.168.1.161:9093,192.168.1.161:9094");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
if (records.count() > 0) {
// 手动同步提交
consumer.commitSync();
}
}
}
}
从头开始消费
public class Consumer {
private final static String TOPIC_NAME = "multi-replicate-partition-topic";
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.161:9092,192.168.1.161:9093,192.168.1.161:9094");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0);
consumer.assign(Arrays.asList(topicPartition));
consumer.seekToBeginning(Arrays.asList((topicPartition)));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
if (records.count() > 0) {
// 手动同步提交
consumer.commitSync();
}
}
}
}
从指定偏移量位置开始消费
public class Consumer {
private final static String TOPIC_NAME = "multi-replicate-partition-topic";
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.161:9092,192.168.1.161:9093,192.168.1.161:9094");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0);
consumer.assign(Arrays.asList(topicPartition));
consumer.seek(topicPartition, 2);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
if (records.count() > 0) {
// 手动同步提交
consumer.commitSync();
}
}
}
}
指定时间点消费
public class Consumer {
private final static String TOPIC_NAME = "multi-replicate-partition-topic";
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.161:9092,192.168.1.161:9093,192.168.1.161:9094");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 一小时前
long oneHourOffsetTime = System.currentTimeMillis() - 1000 * 60 * 60;
// 获取指定topic的分区列表
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(TOPIC_NAME);
Map<TopicPartition, Long> offsetMap = new HashMap<>();
for (PartitionInfo partitionInfo : partitionInfoList) {
offsetMap.put(new TopicPartition(TOPIC_NAME, partitionInfo.partition()), oneHourOffsetTime);
}
// 获取每个分区对应时间点的offset
Map<TopicPartition, OffsetAndTimestamp> timestampMap = consumer.offsetsForTimes(offsetMap);
// 根据获取到的offset,再指定offset消费每个分区
timestampMap.forEach((topicPartition, offsetAndTimestamp) -> {
consumer.assign(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, offsetAndTimestamp.offset());
// 消费逻辑省略...
});
}
}
Kafka的日志存储
Kafka日志存储在log.dirs
配置的路径下, 一个分区一个目录,目录以TOPIC+PARTITION
方式命名,如下图:
消息日志存储是分段的,且每段日志都会生成一个日志文件, 可以通过log.segment.bytes
参数设置每段的大小(最大1G)。
# 存储一部分消息的offset值,kafka每往分区发4k(默认值,支持配置)消息时就会写一次offset到index文件
# 所以在通过offset定位消息时,先经过index定位一次,再去log文件中查具体的消息。
# 稀疏索引
00000000000000000000.index
# 存储对应分区的offset和消息
00000000000000000000.log
# 也是用于存储一部分消息的offset值,kafka每往分区发4k(默认值,支持配置)消息时就会将当前消息的发送时间和offset写入到timeindex文件中。
# 所以在通过时间来定位消息时,会先经过timeindex文件查找offset。
00000000000000000000.timeindex
00000000000000000006.index
00000000000000000006.log
00000000000000000006.timeindex
常见问题及解决方案
消息丢失问题
- 生产方
通过设置
acks
可以一定程度上解决消息丢失问题,[0,1,-1]数据保证级别从低到高。 - 消费方 使用手动提交offset。
消息重复问题
- 生产方 因网络抖动,生产方重试机制导致消息重复发送。需要做消息幂等处理。
- 消费方 消费方拉取到消息后并且消费完毕,此时消费者服务宕机,未提交offset,下次重启后,还会消费到相同的消息。也需要做消息幂等处理。
延时队列设计
可以定义一个指定延时时长5s的topic(如: test_5s), 消费者在消费改队列的消息时,判断消息是否到期,未到期则放弃消费(即不提交offset), 反之则进行消费。
消息积压处理
当消息生产方生产速度过快,导致消费者处理不过来,积压了很多消息时,可以增加一个消费组将消息转发到另一个topic中(该topic设置的分区数较多),由该topic的消费者进行消费。
消息顺序消费
Kafka仅支持单分区内顺序消费。