前言

前面讲解了 Kafka 的生产者,而与生产对应的就是消费者,程序中可以通过 KafkaConsumer 来订阅主题,并从订阅的主题中拉取消息。而 Kafka 中消费者比生产者多了个组的概念,也称消费者组,从而提升单机的消费速度。本文将介绍下消费者与消费者组的概念,然后再对客户端开发进行详细讲解。

一、消费者与消费者组

消费者负责订阅 Kafka 中的主题,并且从上面拉取消息,但与生产者不同的是它增加了消费者组的概念,这是因为很多时候 Kafka 的消费者在消费消息的时候经常会做一些高延时的动作,比如把数据写到数据库,读取数据进行计算处理等,这就相对于 producer 慢的多了,因此消费者组的增加是用来提升 Kafka 的消费能力而出现的,当同一个主题的消息再次过来的时候,这些消息就会被同一个消费者组的消费者来共同消费。

1.1 图解消费者模型

下面,我们来看看这个消费的过程。

情形一:

比如公司里有个打印服务,假设有 6 个打印分区,分别对应 彩印word、彩印excel、彩印ppt、黑白word、黑白excel、黑白ppt 六个分区内容,此时只要一台打印机。如下

情形二:

但是这个打印机打印的效果实在太慢了,很多人一天到晚都挤在打印室排队打印文档。这时,公司就新购了台打印机,让它们分别处理打印请求,可以把它们放在一个消费者组中,同时消费这些分区的数据。

此时它们就分别处理所分配到分区的数据,逻辑上彼此不干扰。同一个主题中的消息只会发布给消费者组中的一个消费者

情形三:

此时,公司想要加个打印备份功能,于是又采购了一个打印机,用来同步打印所有的打印文件。(ps :我也不知道这是什么奇葩公司,为了场景随便举的例子~)

如下:

这时候每个分区的数据都会发送到消费者组B 中即同一个分区的消息可以被不同消费者组的消费者消费

情形四:

此时,公司为了准备融资,给投资人秀秀自己的肌肉,于是又请购了五台打印机,这时候的场景如下:

虽然说消费者与消费者组这种模型可以让整体的消费能力具备横向伸缩性,但是对于分区固定的情况下,增加消费者并不一定能提升消费能力,如图所示,此时就有一台打印机无法分配到分区而消费不了数据。

1.2 消息投递模式

之前说过消息队列的两种模式,即点对点和发布订阅模式。而 Kafka 同时支持这两种模式。下面的这个理解很关键。

  • 点对点模式基于队列,类似于同一个消费者组中的数据,由生产者发送数据到分区,然后消费者拉取分区的消息进行消费,此时消息只能被同一个消费者组的消费者消费一次。
  • 发布订阅模式模式就是 kafka 中的分区消息可以被不同消费者组的消费者消费。这就是一对多的广播模式应用。

当然,消费者组是一个逻辑的概念,通过客户端参数 group.id 来配置,默认值为空字符串。而消费者并不是逻辑的概念,它是真正消费数据的实体,可以是线程、也可以是一个机器。

好,明白了消费者与消费者组的概念,接下来我们正式打开 消费者客户端的潘多拉魔盒。

二、Kafka 消费者的应用

同样,消费者也是依赖于 Kafak 的客户端,正常的消费逻辑是下面几个步骤:

  • 1、配置消费者客户端参数及创建相应的消费者实例
  • 2、订阅主题
  • 3、拉取消息并消费
  • 4、提交消费位移
  • 5、关闭消费者实例

这里的位移可能我们还不清楚是什么意思,别急,我们后面会讲到,先来看下一个典型的消费者它应该怎么写。

2.1 消费者客户端演示

public class Consumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.81.101:9092");

        props.put("group.id", "test"); //消费者组
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
​
        consumer.subscribe(Arrays.asList("xiaolei2"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records){
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}

2.2 必要参数配置

在创建消费者的时候,Kafka 有 4 个参数 是必填的,比生产者多了一个。

  • bootstrap.servers : 这个参数用来指定连接 Kafka 集群的 broker 地址列表,可以是单个地址,也可以用逗号分割填上 Kafka 集群地址。
  • key.deserializer 和 value. deserializer :因为消息发送的时候将key 和 value 进行序列化生成字节数组,因此消费数据的时候需要反序列化为原来的数据。
  • group.id : 消费者所在组的名称,默认值为 ”“,如果设置为空,则会抛出异常 Exception in thread "main" org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration. 复制代码

2.3 订阅主题与分区

在创建出 consumer 之后,我们需要为它订阅相关的主题,一个消费者可以订阅一个或多个主题。这里可以使用两个 API

  • consumer.subscribe(Collection topics) :指明需要订阅的主题的集合;
  • consumer.subscribe(Pattern pattern) :使用正则来匹配需要订阅的集合。

对于它订阅的是个集合,我们也容易理解,Kafka 可以通过正则表达式 来匹配相关主题,例如下面的这样:

consumer.subscribe(Pattern.compile("topic-.*"));

但是如果 consumer 重复定义的话,就以后面的为准,下面订阅的就是 xiaolei3 这个主题。

consumer.subscribe(Arrays.asList("xiaolei2"));
consumer.subscribe(Arrays.asList("xiaolei3"));

订阅完主题,我们讲讲它怎么定义分区。

直接订阅特定分区。

 consumer.assign(Arrays.asList(new TopicPartition("xiaolei2",0)));

这里面使用了 assing 方法来订阅特定分区。那如果不知道有哪些分区怎么办呢?

可以使用 KafkaConsumer 的 partitionsFor() 方法用来查询指定主题的元数据信息。

下面这种实现:

        consumer.assign(Arrays.asList(new TopicPartition("xiaolei2",0)));
        ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
        List<PartitionInfo> partitionInfos = consumer.partitionsFor("xiaolei2");
        for (PartitionInfo partitionInfo : partitionInfos) {
            topicPartitions.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()));
        }
        consumer.assign(topicPartitions);

最后,Kafka 中的消费是基于拉取式的,消息的消费分两种,

  • 一个是推送(push):服务端主动把消息发送给消费者,例如微信公众号文章的发送
  • 一个是拉取(poll):消费者主动向服务端发起请求获取。

Kafka 只需要轮询 API 向服务器定时请求数据,一旦消费者订阅了主题,轮询就会处理所有的细节,例如发送心跳、获取数据、分区再平衡等。而我们则处理业务即可。

三、消费位移

3.1 什么是偏移量

对于 Kafka 的分区来说,它的每条消息都有唯一的偏移量,用来展示消息在分区中对应的位置,它是一个单调递增的整数。在 0.9 版本之后 Kafka 的偏移量是存储在 Kafka 的 _consumer_offsets 主题中。消费者在消费完消息之后会向 这个主题中进行 消费位移的提交。消费者在重新启动的时候就会从新的消费位移处开始消费消息。

因为,位移提交是在消费完所有拉取到的消息之后才执行的,如果不能正确提交偏移量,就可能发生数据丢失或重复消费。

  • 如果在消费到 x+2 的时候发生异常,发生故障,在故障恢复后,重新拉取消息还是从 x处开始,那么之前 x到 x+2 的数据就重复消费了。
  • 如果在消费到 x+2 的时候,提前把 offset 提交了,此时消息还没有消费完,然后发生故障,等重启之后,就从新的 offset x+5 处开始消费,那么 x+2 到 x+5 中间的消息就丢失了。

因此,在什么时机提交 偏移量 显的尤为重要,在 Kafka 中位移的提交分为手动提交和自动提交,下面对这两种展示讲解。

3.2 自动提交偏移量

在 Kafka 中默认的消费位移的提交方式是 自动提交。这个在消费者客户端参数 enable.auto.commit 配置,默认为 true。它是定期向 _comsumer_offsets 中提交 poll 拉取下来的最大消息偏移量。定期时间在 auto.commit.interval.ms 配置,默认为 5s。

虽然自动提交消费位移的方式非常方便,让编码更加简洁,但是自动提交是存在问题的,就是我们上面说的数据丢失和重复消费,这两种它一个不落,因此,Kafka 提供了手动提交位移量,更加灵活的处理消费位移。

3.3 手动提交偏移量

开启手动提交位移的前提是需要关闭自动提交配置,将 enable.auto.commit 配置更改为 false。

根据用户需要,这个偏移量值可以是分为两类:

  • 常规的,手动提交拉取到的最大偏移量。
  • 手动提交固定值的偏移量。

手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次poll的一批数据最高的偏移量提交;不同点是,commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。

3.3.1 同步提交 offset

由于同步提交 offsets 有失败重试机制,故更加可靠。

public class CustomComsumer {
​
    public static void main(String[] args) {
​
        Properties props = new Properties();
​
        //Kafka集群
        props.put("bootstrap.servers", "hadoop102:9092"); 
​
        //消费者组,只要group.id相同,就属于同一个消费者组
        props.put("group.id", "test"); 
​
        props.put("enable.auto.commit", "false");//关闭自动提交offset
​
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
​
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
​
        consumer.subscribe(Arrays.asList("first"));//消费者订阅主题
​
        while (true) {
            //消费者拉取数据
            ConsumerRecords<String, String> records = consumer.poll(100); 
            
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            
            //同步提交,当前线程会阻塞直到offset提交成功
            consumer.commitSync();
        }
    }
}

3.3.2 异步提交 offset

虽然同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交offset的方式。

以下为异步提交offset的示例:

public class CustomConsumer {
​
    public static void main(String[] args) {
​
        Properties props = new Properties();
​
        //Kafka集群
        props.put("bootstrap.servers", "hadoop102:9092"); 
​
        //消费者组,只要group.id相同,就属于同一个消费者组
        props.put("group.id", "test"); 
​
        //关闭自动提交offset
        props.put("enable.auto.commit", "false");
​
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
​
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("first"));//消费者订阅主题
​
            while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);//消费者拉取数据
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
​
            //异步提交
            consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                    if (exception != null) {
                        System.err.println("Commit failed for" + offsets);
                    }
                }
            }); 
        }
    }
}

异步提交可以提高程序的吞吐量,因为此时你可以尽管请求数据,而不用等待响应。

异步提交的时候同样有失败的情况出现,假设第一次提交了 100 的位移,但是提交失败了,第二次提交了 200 的位移,此时怎么处理?

如果重试,将 100 的位移再次提交,这次提交成功了,就会覆盖 200 的位移,此时变成 100。那么就会出现消费重复的情况,继续从100 处开始消费。

因此,基于这个原因,可以使用 同步 +异步的组合方式,在100 提交之后必须等待请求成功才能提交 200 的位移。

3.3.3 同步加异步提交

在正常的轮询中使用异步提交来保证吞吐量,但是在最后关闭消费者之前,或发生异常之后,此时使用同步提交的方式来保证最后的提交成功。这是在最后做的一次把关。

try {
    while (true) {
        // 拉取消息逻辑处理
        // 异步提交
        consumer.commitAsync();
    }
} catch (Exception e) {
    e.printStackTrace();
} finally {
    try {
        // 即将要关闭消费者,同步提交保证提交成功
        consumer.commitSync();
    } finally {
        consumer.close();
    }
}

3.4 指定位移消费

因为消费位移的存在,我们可以在消费者关闭、宕机重启、再平衡的时候找到存储的位移位置,开始消费,但是消费位移并不是一开始就有的,例如下面这几种情况:

  • 1、当一个新的消费者组建立的时候
  • 2、消费者组内的一个消费者订阅了一个新的主题;
  • 3、_comsumer_offsets 主题的位移信息过期被删除

这几种情况 Kafka 没办法找到 消费位移,就会根据 客户端参数 auto.offset.reset 的配置来决定从何处开始消费,默认为 latest

  • earliest:当各分区下存在已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,从头开始消费;
  • latest:当各分区下存在已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费该分区下新产生的数据(默认值);
  • none:当各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的offset,则直接抛出NoOffsetForPartitionException异常;

Kafka 的 auto.offset.reset 参数只能让我们粗粒度的从开头或末尾开始消费,并不能指定准确的位移开始拉取消息,而 KafkaConsumer 中的 seek()方法正好提供了这个功能,可以让我们提前消费和回溯消费,这样为消息的消费提供了很大的灵活性,seek()方法还可以通过 storeOffsetToDB 将消息位移保存在外部存储介质中,还可以配合再平衡***来提供更加精准的消费能力。

3.4.1 seek 指定位移消费

seek 方法定义如下:

public void seek(TopicPartition partition, long offset)
  • partition 表示分区
  • offset 表示从分区的哪个位置开始消费
afkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
​
        consumer.subscribe(Arrays.asList("xiaolei2"));
        
        consumer.poll(Duration.ofMillis(10000));
        Set<TopicPartition> assignment = consumer.assignment();
        for (TopicPartition tp : assignment) {
            consumer.seek(tp,100);
        }
​
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records){
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }

seek() 方法只能重置消费者分配到的分区的消费位置,而分区的分配是在 poll() 方法的调用过程中实现的,也就是说,在执行 seek() 方法之前需要先执行一次 poll() 方法,等到分配到分区之后才可以重置消费位置。

因此,在poll()方法中设置一个时间等待分区完成,然后在通过 assignment()方法获取分区信息进行数据消费。

如果在 poll()方法中设置为0 那么就无法获取到分区。这个时间如果太长也会造成不必要的等待,下面看看优化的方案。

3.4.2 seek 指定位移消费优化

 consumer.subscribe(Arrays.asList("xiaolei2"));
​
        Set<TopicPartition> assignment = new HashSet<>();
        while (assignment.size()==0){
            consumer.poll(Duration.ofMillis(100));
            assignment=consumer.assignment();
        }
        for (TopicPartition tp : assignment) {
            consumer.seek(tp,100);
        }
​
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records){
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }

3.4.3 seek 从分区开头或末尾消费

如果消费者组内的消费者在启动的时候能够找到消费位移,除非发生位移越界,否则 auto.offset.reset 参数不会奏效。此时如果想指定从开头或末尾开始消费,也需要 seek() 方法来实现。

如果按照指定位移消费的话,就需要先获取每个分区的开头或末尾的 offset 了。可以使用 beginningOffsets() 和 endOffsets() 方法。

Set<TopicPartition> assignment = new HashSet<>();
// 在poll()方法内部执行分区分配逻辑,该循环确保分区已被分配。
// 当分区消息为0时进入此循环,如果不为0,则说明已经成功分配到了分区。
while (assignment.size() == 0) {
    consumer.poll(100);
    // assignment()方法是用来获取消费者所分配到的分区消息的
    // assignment的值为:topic-demo-3, topic-demo-0, topic-demo-2, topic-demo-1
    assignment = consumer.assignment();
}
​
// 指定分区从头消费
Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(assignment);
for (TopicPartition tp : assignment) {
    Long offset = beginOffsets.get(tp);
    System.out.println("分区 " + tp + " 从 " + offset + " 开始消费");
    consumer.seek(tp, offset);
}
​
// 指定分区从末尾消费
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
for (TopicPartition tp : assignment) {
    Long offset = endOffsets.get(tp);
    System.out.println("分区 " + tp + " 从 " + offset + " 开始消费");
    consumer.seek(tp, offset);
}
​
// 再次执行poll()方法,消费拉取到的数据。
// ...(省略)

其实,KafkaConsumer 中直接提供了 seekToBeginning() 和 seekToEnd() 方法来实现上述功能。具体定义如下:

public void seekToBeginning(Collection<TopicPartition> partitions)
public void seekToEnd(Collection<TopicPartition> partitions) 

替代代码如下:

Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(assignment);
for (TopicPartition tp : assignment) {
    Long offset = beginOffsets.get(tp);
    System.out.println("分区 " + tp + " 从 " + offset + " 开始消费");
    consumer.seek(tp, offset);
}

3.4.5 根据时间戳消费

比如,我们要消费前天这时刻的消息,此时就无法直接追溯到这个位置了,这时可以使用 KafkaConsumer 的 offsetsForTimes 方法

public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)

offsetsForTimes() 方法的参数 timestampsToSearch 是一个 Map 类型,其中 key 为待查询的分区,value 为待查询的时间戳,该方***返回时间戳大于等于查询时间的第一条消息对应的 offset 和 timestamp 。

接下来就以消费当前时间前一天之后的消息为例,代码如下:

Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
    consumer.poll(100);
    assignment = consumer.assignment();
}
​
Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
for (TopicPartition tp : assignment) {
    // 设置查询分区时间戳的条件:获取当前时间前一天之后的消息
    timestampToSearch.put(tp, System.currentTimeMillis() - 24 * 3600 * 1000);
}
​
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampToSearch);
​
for(TopicPartition tp: assignment){
    OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp);
    // 如果offsetAndTimestamp不为null,则证明当前分区有符合时间戳条件的消息
    if (offsetAndTimestamp != null) {
        consumer.seek(tp, offsetAndTimestamp.offset());
    }
}
​
while (true) {
   ConsumerRecords<String, String> records = consumer.poll(100);
    // 消费记录
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.offset() + ":" + record.value() + ":" + record.partition() + ":" + record.timestamp());
    }
}

四、控制或关闭消费

KafkaConsumer 提供了对消费速度进行控制的方法,某些时刻,我们可能会关闭或暂停某个分区的消费,而先消费其他分区,当达到一定条件时再恢复这些分区的消费,这两个方法是 pause() (暂停消费) 和 resume()(恢复消费)。

    public void pause(Collection<TopicPartition> partitions) {
        this.acquireAndEnsureOpen();
​
        try {
            this.log.debug("Pausing partitions {}", partitions);
            Iterator var2 = partitions.iterator();
​
            while(var2.hasNext()) {
                TopicPartition partition = (TopicPartition)var2.next();
                this.subscriptions.pause(partition);
            }
        } finally {
            this.release();
        }
    }
 public void resume(Collection<TopicPartition> partitions) {
        this.acquireAndEnsureOpen();
​
        try {
            this.log.debug("Resuming partitions {}", partitions);
            Iterator var2 = partitions.iterator();
​
            while(var2.hasNext()) {
                TopicPartition partition = (TopicPartition)var2.next();
                this.subscriptions.resume(partition);
            }
        } finally {
            this.release();
        }
​
    }

除了暂停和恢复之外,Kafka 还提供了午餐的 paused() 方法来返回暂停的分区集合。

public Set<TopicPartition> paused()

五、再平衡

再平衡是指分区的所有权从一个消费者转移到另一消费者的行为,例如新增消费者的时候,再平衡会导致分区与消费者的重新划分,为消费者组提供了高可用和伸缩性保障。

再平衡发生的时候,消费者组内的消费者是无法读取消息的,也就是说,在再平衡发生期间的这一小段时间内,消费者会变得不可用。另外,再平衡也可能会造成消息重复,因为当一个分区被分配到另一个消费者时,消费者当时的状态会丢失,此时还未来得及将消费位移同步,新的消费者就会从原先的位移开始消费,因此,尽量要避免再平衡的发生。

我们可以使用 subscribe 的重载方法传入自定义的分区再平衡***

 /*订阅指定集合内的所有主题*/
subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
 /*使用正则匹配需要订阅的主题*/    
subscribe(Pattern pattern, ConsumerRebalanceListener listener)    

代码如下:

Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
​
consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
    /*该方***在消费者停止读取消息之后,再均衡开始之前就调用*/
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        System.out.println("再均衡即将触发");
        // 提交已经处理的偏移量
        consumer.commitSync(currentOffsets);
        // 清除局部变量
        currentOffsets.clear();
    }
​
    /*该方***在重新分配分区之后,消费者开始读取消息之前被调用*/
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
​
    }
});
​
try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record);
            TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
            OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1, "no metaData");
            //TopicPartition 重写过 hashCode 和 equals 方法,所以能够保证同一主题和分区的实例不会被重复添加
            currentOffsets.put(topicPartition, offsetAndMetadata);
        }
        consumer.commitAsync(currentOffsets, null);
    }
} finally {
    consumer.close();
}

代码中将消息位移暂存在 局部变量 currentOffsets 中,在正常消费时候可以通过 异步提交消费位移,但在发生再平衡动作之前,对onPartitionsRevoked 回调函数进行同步提交,从而避免再平衡的重复消费。

六、拦截器

与生产者客户端拦截器机制一样,kafka消费者客户端中也定义了拦截器逻辑,通过实现ConsumerInterceptor来实现自定义拦截器逻辑,ConsumerInterceptor主要有三个方法:

  • public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) consumer会在poll方法返回之前调用此方法,来对消息进行定制化的操作,比如修改消息内容,按照一定规则过滤消息等。
  • public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) consumer会在提交消费位移之后调用此方法,可以在此方法中跟踪提交位移的相关信息。
  • public void close() :关闭
public class ConsumerInterceptorPrefix implements ConsumerInterceptor<String,String> {
    @Override
    public ConsumerRecords<String,String> onConsume(ConsumerRecords<String,String> consumerRecords) {
        Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();
        for (TopicPartition partition : consumerRecords.partitions()) {
            List<ConsumerRecord<String, String>> recs = consumerRecords.records(partition);
            List<ConsumerRecord<String, String>> newRecs = new ArrayList<>();
            for(ConsumerRecord<String,String> rec:recs){
                String newValue = "xiaolei-"+rec.value();
                ConsumerRecord<String,String> newRec = new ConsumerRecord<>(rec.topic(),
                        rec.partition(),rec.offset(),rec.key(),newValue);
                newRecs.add(newRec);
            }
            newRecords.put(partition,newRecs);
        }
        return new ConsumerRecords<>(newRecords);
    }
​
    @Override
    public void close() {
​
    }
​
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
        map.forEach((tp,offsetAndMetadata) -> {
            System.out.println(tp+" : "+offsetAndMetadata.offset());
        });
    }
​
    @Override
    public void configure(Map<String, ?> map) {
​
    }
}

在配置类添加拦截器

props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptorPrefix.class.getName());

七、重要的消费者参数

7.1 fetch.min.bytes

该属性指定了消费者从服务器获取记录的最小字节数。broker在收到消费者的数据请求时,如果可用的数据量小于fetch.min.bytes指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者。这样可以降低消费者和broker的工作负载,因为它们在主题不是很活跃的时候就不需要来来回回地处理消息。如果没有很多可用的数据,但消费者的CPU使用率却很高,可以将此属性值设置的比默认值大。如果消费者的数量较多,把该属性值的值设置的大一点可以降低broker的工作负载。

7.2 fetch.max.wait.ms

该属性指定broker返回消息的等待时间,默认是500ms。如果没有足够的数据流入kafka,消费者获取最小数据量的要求就得不到满足,最终导致500ms的延迟。如果要降低潜在的延迟(为了满足SLA),可以把该参数值设置的小一些。如果fetch.max.wait.ms被设为100ms,并且fetch.min.bytes被设为1MB,kafka在收到消费者的请求后,要么返回1MB的数据,要么在100ms后返回可用的数据,只要有一个条件满足了,就会立马返回。

7.3 max.partition.fetch.bytes

该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是1MB。KafkaConsumer.poll()方法从每个分区里返回的记录最多不超过max.partition.fetch.bytes指定的字节。如果一个主题有20个分区和5个消费者,那么每个消费者需要至少4MB的可用内存来接收记录。在为消费者分配内存时,可以给它们多分配一些,因为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。

max.partition.fetch.bytes的值必须比broker能够接收的最大消息的字节数(max.message.size)大,否则消费者可能无法读取这些消息,导致消费者一直挂起重试。

在设置此值时,还需要考虑消费者处理数据的时间。消费者需要频繁的调用poll()方法来避免会话过期和发生分区的再均衡,如果单次调用poll()返回的数据太多,消费者需要更多的时间来处理,可能无法及时进行下一个轮询来避免会话过期。出现这种情况,可以把max.partition.fetch.bytes改小,或者延长会话过期时间。

7.4 session.timeout.ms

该属性值指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是3s。如果消费者没有在session.timeout.ms指定的时间内发送心跳给群组协调器,就被认为已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其它消费者。heartbeat.interval.ms指定了poll()方法向协调器发送心跳的频率,session.timeout.ms则指定了消费者可以多久不发送心跳。所以,一般需要同时修改这两个属性,heartbeat.interval.ms必须比session.timeout.ms小,一般是session.timeout.ms的三分之一。

session.timeout.ms调小:可以更快地检测和恢复崩溃的节点,不过长时间的轮询或垃圾收集可能导致非预期的再均衡。

session.timeout.ms调大:可以减少意外的再均衡,不过检测节点崩溃需要更长的时间。

7.5 auto.offset.reset

该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理。它的默认值是latest,偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)。另一个值是earliest,偏移量无效的情况下,消费者将从起始位置读取分区的记录。

7.6 enable.auto.commit

该属性指定了消费者是否自动提交偏移量,默认值是true。为了尽量避免出现重复数据和数据丢失,可以把它设为false,由自己控制何时提交偏移量。如果把它设为true,还可以通过配置auto.commit.interval.ms属性来控制提交的频率。

7.7 partition.assignment.strategy

分区会被分配给群组里的消费者。PartitionAssignor根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者。kafka有两个默认的分配策略

Range(默认):该策略会把主题的若干个连续的分区分配给消费者。假设消费者C1和C2同时订阅了主题T1和主题T2,并且每个主题有3个分区。那么消费者C1有可能分配到这两个主题的分区0和分区1,四个分区;而消费者C2分配到这两个主题的分区2,两个分区。因为每个主题拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消费者更多的分区。只要使用了Range策略,而且分区数量无法被消费者数量整除,就会出现这种情况。

org.apache.kafka.clients.consumer.RangeAssignor

RoundRobin:该策略把主题的所有分区逐个分配给消费者。如果使用RoundRobin策略来给消费者C1和消费者C2分配分区,那么消费者C1将分到主题T1的分区0和分区2以及T2主题的分区1;消费者C2将分配到主题T1的分区1以及主体T2的分区0和分区2.一般来说,如果所有消费者都订阅相同的主题,RoundRobin策略会给所有消费者分配相同数量的分区(最多差一个分区)。

org.apache.kafka.clients.consumer.RoundRobinAssignor

7.8 client.id

该属性可以是任意字符串,broker用它来标记从客户端发送过来的消息,通常被用在日志、度量指标和配额里。

7.9 max.poll.records

该属性用于控制单次调用poll()方法能够返回的记录数量,可以控制在轮询里需要处理的数据量。

7.10 receive.buffer.bytes和send.buffer.bytes

socket在读写数据时用到的TCP缓冲区也可以设置大小。如果它们被设为-1,就使用操作系统的默认值。如果生产者或消费者与broker处于不同的数据中心内,可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。