Kafka配置

生产者常见配置项

acks(默认值1)

  • acks=0 生产者不会等待broker的任何回复,这种情况下不能保证消息被broker接收。
  • acks=1 生产者只会等待leader节点将数据写入到本地log,不关心follower是否成功同步到消息。若此时leader发生故障且没有将新消息同步给follower,新消息会丢失。
  • acks=-1 or acks=all 生产者需要等待leader同步消息给相应数量的follower,该数量由min.insync.replicas配置(默认为1,推荐配置大于等于2)决定。

retries

设置失败重试次数,失败重试一定程度上可以保证消息发送的可靠性,也有可能出现消息重复发送(如网络抖动,生产者未收到broker的响应,实际消息已成功发送),此时需要注意消息的幂等性,避免重复消费。

retry.backoff.ms

重试间隔时间,默认100ms

buffer.memory

设置本地缓冲区大小,默认值33554432字节(32M)。生产者发送消息时,会先将消息发送到本地缓冲区,再由本地线程从缓冲区读取数据批量发送给broker。

batch.size

批量发送消息的大小,默认值16384字节(16K)。当从缓冲区读取数据到达16k时即发送。

linger.ms

消息徘徊时间,默认值0,表示消息必须立即发送。若超过徘徊时间还未能从缓冲区读取到16k的数据,这些数据也必须发送。

消费者常见配置项

enable.auto.commit

设置是否自动提交,默认值true。消费者拉取到消息后自动提交偏移量。

auto.commit.interval.ms

自动提交间隔,默认值5000,表示自动提交任务每隔5秒执行一次。

自动提交可能引起的问题:

  1. 重复消费:若读取到消息后,业务代码的执行小于自动提交间隔时间,若在业务逻辑已处理完成后,还未提交偏移量,此时消费者故障重启后,还会拉取到故障时间点之前的历史数据,也就是重复消费问题。
  2. 丢消息: 若读取到消息后,业务代码的执行时间大于自动提交间隔时间且过了自动提交时间点,消息还在逐条处理,此时消费端故障重启后就拉取不到故障时间点之后的历史消息了,也就是消息丢失了。

auto.offset.reset

设置新加入的消费组从哪开始消费。

  • latest 默认值,只会消费启动后的新消息
  • earliest 启动时从头消费

max.poll.records

设置消费者一次最多拉取的消息个数。默认值500

max.poll.interval.ms

若消费者poll间隔时间超过该值,broker会认为消费者处理能力弱并将消费者移出消费组,分区数据分配给别的消费者消费。默认值300000(5分钟)

session.timeout.ms

设置超时时间,消费者超过该时间后还未发送心跳,broker也会将消费者移出消费组,分区数据分配给别的消费者消费,默认值10000(10秒)。

heartbeat.interval.ms

设置发送心跳的间隔时间,默认值3000(3秒)

消费者常见消费方式

指定主题消费

public class Consumer {
    private final static String TOPIC_NAME = "multi-replicate-partition-topic";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.161:9092,192.168.1.161:9093,192.168.1.161:9094");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            if (records.count() > 0) {
                // 手动同步提交
                consumer.commitSync();
            }
        }
    }
}

消费指定分区

public class Consumer {
    private final static String TOPIC_NAME = "multi-replicate-partition-topic";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.161:9092,192.168.1.161:9093,192.168.1.161:9094");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            if (records.count() > 0) {
                // 手动同步提交
                consumer.commitSync();
            }
        }
    }
}

从头开始消费

public class Consumer {
    private final static String TOPIC_NAME = "multi-replicate-partition-topic";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.161:9092,192.168.1.161:9093,192.168.1.161:9094");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0);
        consumer.assign(Arrays.asList(topicPartition));
        consumer.seekToBeginning(Arrays.asList((topicPartition)));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            if (records.count() > 0) {
                // 手动同步提交
                consumer.commitSync();
            }
        }
    }
}

从指定偏移量位置开始消费

public class Consumer {
    private final static String TOPIC_NAME = "multi-replicate-partition-topic";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.161:9092,192.168.1.161:9093,192.168.1.161:9094");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0);
        consumer.assign(Arrays.asList(topicPartition));
        consumer.seek(topicPartition, 2);
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            if (records.count() > 0) {
                // 手动同步提交
                consumer.commitSync();
            }
        }
    }
}

指定时间点消费

public class Consumer {
    private final static String TOPIC_NAME = "multi-replicate-partition-topic";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.161:9092,192.168.1.161:9093,192.168.1.161:9094");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 一小时前
        long oneHourOffsetTime = System.currentTimeMillis() - 1000 * 60 * 60;
        // 获取指定topic的分区列表
        List<PartitionInfo> partitionInfoList = consumer.partitionsFor(TOPIC_NAME);
        Map<TopicPartition, Long> offsetMap = new HashMap<>();
        for (PartitionInfo partitionInfo : partitionInfoList) {
            offsetMap.put(new TopicPartition(TOPIC_NAME, partitionInfo.partition()), oneHourOffsetTime);
        }
        // 获取每个分区对应时间点的offset
        Map<TopicPartition, OffsetAndTimestamp> timestampMap = consumer.offsetsForTimes(offsetMap);
        // 根据获取到的offset,再指定offset消费每个分区
        timestampMap.forEach((topicPartition, offsetAndTimestamp) -> {
            consumer.assign(Collections.singletonList(topicPartition));
            consumer.seek(topicPartition, offsetAndTimestamp.offset());
            // 消费逻辑省略...
        });
    }
}

Kafka的日志存储

Kafka日志存储在log.dirs配置的路径下, 一个分区一个目录,目录以TOPIC+PARTITION方式命名,如下图: kafkaLog 消息日志存储是分段的,且每段日志都会生成一个日志文件, 可以通过log.segment.bytes参数设置每段的大小(最大1G)。

# 存储一部分消息的offset值,kafka每往分区发4k(默认值,支持配置)消息时就会写一次offset到index文件
# 所以在通过offset定位消息时,先经过index定位一次,再去log文件中查具体的消息。
# 稀疏索引
00000000000000000000.index
# 存储对应分区的offset和消息
00000000000000000000.log
# 也是用于存储一部分消息的offset值,kafka每往分区发4k(默认值,支持配置)消息时就会将当前消息的发送时间和offset写入到timeindex文件中。
# 所以在通过时间来定位消息时,会先经过timeindex文件查找offset。
00000000000000000000.timeindex

00000000000000000006.index
00000000000000000006.log
00000000000000000006.timeindex

常见问题及解决方案

消息丢失问题

  • 生产方 通过设置acks可以一定程度上解决消息丢失问题,[0,1,-1]数据保证级别从低到高。
  • 消费方 使用手动提交offset。

消息重复问题

  • 生产方 因网络抖动,生产方重试机制导致消息重复发送。需要做消息幂等处理。
  • 消费方 消费方拉取到消息后并且消费完毕,此时消费者服务宕机,未提交offset,下次重启后,还会消费到相同的消息。也需要做消息幂等处理。

延时队列设计

可以定义一个指定延时时长5s的topic(如: test_5s), 消费者在消费改队列的消息时,判断消息是否到期,未到期则放弃消费(即不提交offset), 反之则进行消费。

消息积压处理

当消息生产方生产速度过快,导致消费者处理不过来,积压了很多消息时,可以增加一个消费组将消息转发到另一个topic中(该topic设置的分区数较多),由该topic的消费者进行消费。

消息顺序消费

Kafka仅支持单分区内顺序消费。