在这篇博客中,我介绍了在Java中定义Kafka消费者的各种方法。Spring、Micronaut、Vert.x和Akka Streams在引擎盖下使用kafka-clients库,并提供完整的功能集来消费Kafka消息。
Kafka 是一个著名的事件流平台。我们在很多项目中使用它。没什么不寻常的——工具很棒。各种框架和库提供与 Kafka 的集成。在这篇文章中,我想介绍其中一些 Java 语言,看看我们如何创建一个客户实例,用来读取Kafka消息:
连接到 Kafka 的第一种方法是使用 kafka-clients 库 中的KafkaConsumer类。其他库或框架集成通常使用该库。在本节中,我将重点介绍直接使用它。虽然它非常简单,但我们需要付出一些努力来提高效率。
首先,我们希望我们的消费者持续工作。因此,我们将在单独的线程中运行它,我们需要自己管理它。此外,我们需要将轮询放入不定式循环中。
另一件事是关闭消费者,这可能很棘手。我们可以关闭线程并使用超时来关闭套接字和网络连接。然而,采用这种方法,我们错过了两个要点:
- 显式关闭消费者会立即触发重新平衡,因为组协调员没有发现消费者因丢失心跳而离开。
- 该操作也会完成待处理的偏移量提交。因此,在再次运行消费者之后,我们不会两次消费某些消息。
接下来,如果我们想并行消费消息,我们需要提供一个自定义的解决方案来在同一个消费者组中运行特定数量的消费者。然而,每个消费者都需要两个线程——一个用于轮询,另一个用于心跳。
在消息的批量消费方面,我们在轮询一个队列后得到一个记录集合(可能为空)。因此,我们不必提供任何特定的配置或机制。
当我们想流式传输接收到的数据时,我们可以使用 JDK 的 Stream API。但是如果我们想 并行使用它们, 情况就会变得复杂。更复杂的代码变得更容易出错。
默认情况下,消费者会自动提交偏移量。但是,我们可以更改这一点并手动完成工作。API 为我们提供了几种同步或 异步 调用操作的方法。此外,我们可以提交从队列上最后一次轮询收到的所有消息的所有偏移量,或者提供特定的主题分区值。
使用普通的 Kafka 消费者,我们处理ConsumerRecord包含消息本身及其元数据的实例。它本身并不是一个缺点。但是,如果我们想解析它,我们需要提供我们的机制。
所以,总的来说,我会谨慎使用这种方法,而是考虑其他可用的可能性。那么,让我们看看如何在一些框架或工具包中使用 Kafka。
Spring Boot
当您在项目中使用 Spring Boot 时,您可以 使用 Spring for Kafka 集成。它提供了一种方便的***机制来实现对 Kafka 消息的消费。
我们可以通过两种方式消费消息:
- 使用消息侦听器的容器,
- 或通过提供带有@KafkaListener注释的类。
当我们想使用消息侦听器方法时,我们需要提供两种类型的容器之一来运行我们的侦听器:
- KafkaMessageListenerContainer— 在单个线程上为容器配置中提供的所有主题提供消息消费,
- ConcurrentMessageListenerContainerKafkaMessageeListenerContainer— 允许在多线程环境中使用消息,为每个线程提供一个消息。
容器具有丰富的 API,允许我们设置各种配置参数(如线程、批处理、确认、错误处理程序等)。重要的是要设置一个***类——一个消息驱动的 POJO。MessageListener它是orBatchMessageListener接口的一个实例。两者都是基本的,允许我们使用类型化的 ConsumerRecord 实例。Spring 还提供了 其他更复杂的接口 。
然而,在 Spring 中使用 Kafka 消息最直接的方法是使用@KafkaListener注解实现一个 bean。处理接收到的消息的方法的签名可能会有所不同。您将使用的输入参数取决于您的需要,并且有很多可能性(有关详细信息,请查看注释 javadocs)。在启动时,Spring 会查找注解使用情况(带有注解的类必须是 Spring 组件)并创建运行在侦听器中定义的逻辑的 Kafka 消费者。
@Component <b>public</b> <b>class</b> KafkaListenerConsumer { @KafkaListener(topics = <font>"${spring.kafka.consumer.topic}"</font><font>, groupId = </font><font>"${spring.kafka.consumer.group-id}"</font><font>) <b>public</b> <b>void</b> processMessage(List<Message<String>> content) { </font><font><i>// processing logic comes here</i></font><font> } } </font>
默认情况下,@KafkaListener在单个线程中运行——我们不会并行使用来自主题分区的消息。但是,我们可以通过两种方式改变这种行为。
第一个是定义concurrency注解的参数,我们可以在其中设置给定侦听器正在使用的线程数。
@Component <b>public</b> <b>class</b> KafkaListenerConsumer { @KafkaListener( concurrency = <font>"2"</font><font>, topics = </font><font>"${spring.kafka.consumer.topic}"</font><font>, groupId = </font><font>"${spring.kafka.consumer.group-id}"</font><font>) <b>public</b> <b>void</b> processMessage(List<Message<String>> content) { </font><font><i>// processing logic comes here</i></font><font> } } </font>
第二个选项是为containerFactory参数提供一个值。它是生产用于运行侦听器逻辑的容器的容器工厂 bean 的名称。当工厂不是单线程时(并发设置为大于 1 的值),框架将容器线程分配给分区。
@Bean ConcurrentKafkaListenerContainerFactory<String, String> multiThreadedListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = <b>new</b> ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); <b>return</b> factory; }
在这两种情况下,如果我们的线程数多于分区数,则有些线程仍处于空闲状态。
这还没有结束——我们甚至可以使用topicPartitions参数为特定分区指定侦听器方法。有了这样的解决方案,Spring 会自动在单独的线程中运行每一个。
@Component <b>public</b> <b>class</b> PartitionedKafkaListenerConsumer { @KafkaListener( clientIdPrefix = <font>"part0"</font><font>, topics = </font><font>"${spring.kafka.consumer.topic}"</font><font>, groupId = </font><font>"${spring.kafka.consumer.group-id}"</font><font>, topicPartitions = { @TopicPartition(topic = </font><font>"${spring.kafka.consumer.topic}"</font><font>, partitions = {</font><font>"0"</font><font>}) }) <b>public</b> <b>void</b> partition0(ConsumerRecord<String, String> content) { </font><font><i>// processing logic comes here</i></font><font> } @KafkaListener( clientIdPrefix = </font><font>"part1"</font><font>, topics = </font><font>"${spring.kafka.consumer.topic}"</font><font>, groupId = </font><font>"${spring.kafka.consumer.group-id}"</font><font>, topicPartitions = { @TopicPartition(topic = </font><font>"${spring.kafka.consumer.topic}"</font><font>, partitions = {</font><font>"1"</font><font>}) }) <b>public</b> <b>void</b> partition1(ConsumerRecord<String, String> content) { </font><font><i>// processing logic comes here</i></font><font> } } </font>
Spring for Kafka 也提供了批量消费消息的功能。当然,我们有不止一种选择。
第一个是容器工厂中批处理的配置开关。启用后,我们可以提供一个接受消息列表的侦听器。重要的是——我们需要使用批处理容器作为KafkaListener 注解中的containerFactory参数值。
一个选项使用带有前缀的 消息侦听器接口。 Batch他们接受消费者记录列表而不是单个记录。
当涉及到手动提交消息偏移量时,我们有同样丰富的选择。首先,我们有原始的 Kafka 设置,即 `enable.auto.commit`。当它为真时,Kafka 根据其配置提交所有消息。 否则,将根据配置中设置的确认模式 的值来选择负责提交的实体。对于 ack 设置为MANUALor MANUAL_IMMEDIATE,由开发人员提交偏移量。对于所有其他值,由容器来运行它。此外,我们可以指定提交操作的同步性。
当我们使用手动提交时,我们可以Acknowledgment在框架的一些消息***中使用该类。该接口提供了调用已处理消息的提交操作或丢弃上次轮询的剩余记录的方法。
Spring for Kafka 让我印象深刻的是设置工作 Kafka 消费者的方法的数量。我们可以通过多种方式做到这一点,这很好,因为框架的弹性。但是,当我们迷失在各种可用选项中时,它也可能是有害的。
Micronaut
与 Spring 一样,Micronaut 框架与 Kafka 进行了专门的集成,并且也适用于消息驱动的 POJO。消费者的配置甚至类似。
我们从@KafkaListener类级别的注释开始。这是我们定义一组消费者的地方。这样的组是基于为具有给定 groupId 的特定组提供默认值或值的配置文件内容配置的。我们可以使用注释参数覆盖这些值。
侦听器类的每个公共或包私有方法,用@Topic(提供强制性主题名称/模式)注释,成为在后台运行的 Kafka 消费者。我们也可以将注释放在类级别,但所有公共或私有包方法都成为 Kafka 消费者。所以我们需要小心这个。
@KafkaListener(groupId = <font>"micronaut-group"</font><font>, clientId = </font><font>"${kafka.consumers.micronaut-group.client-id}"</font><font>) <b>public</b> <b>class</b> MicronautListener { @Topic(</font><font>"${kafka.consumers.micronaut-group.topic}"</font><font>) <b>void</b> receive(@KafkaKey String key, String value) { </font><font><i>// processing logic comes here</i></font><font> } } </font>
要设置并发消息处理,我们可以将线程数定义为@KafkaListener注释参数。如果我们提供的线程数少于分区数,一些消费者将处理来自两个或更多分区的消息。另一方面,如果我们设置更多它们,一些会保持闲置,什么也不做。这与 Spring 集成中的行为相同。
@KafkaListener(groupId = <font>"micronaut-group"</font><font>, clientId = </font><font>"${kafka.consumers.micronaut-group.client-id}"</font><font>, threads = 5) <b>public</b> <b>class</b> MultithreadedMicronautListener { @Topic(</font><font>"${kafka.consumers.micronaut-group.topic}"</font><font>) <b>void</b> receive(@KafkaKey String key, String value, <b>int</b> partition) { <b>switch</b> (partition) { <b>case</b> 0: </font><font><i>// processing logic comes here</i></font><font> <b>break</b>; <b>case</b> 1: </font><font><i>// processing logic comes here</i></font><font> <b>break</b>; <b>case</b> 2: </font><font><i>// processing logic comes here</i></font><font> <b>break</b>; <b>default</b>: log.error(</font><font>"Message (key {}, value {}) from unexpected partition ({}) received."</font><font>, key, value, partition); } } } </font>
同样,我们可以使用注释上的batch参数启用批处理。
然后我们可以在消费者方法中消费一个记录列表(或领域类)。
@KafkaListener(groupId = <font>"micronaut-group"</font><font>, clientId = </font><font>"${kafka.consumers.micronaut-group.client-id}"</font><font>, batch = <b>true</b>) <b>public</b> <b>class</b> BatchedMicronautListener { @Topic(</font><font>"${kafka.consumers.micronaut-group.topic}"</font><font>) <b>void</b> receive(List<ConsumerRecord<String, String>> records) { log.info(</font><font>"Batch received: {}"</font><font>, records.size()); records.forEach(rec -> </font><font><i>// processing logic comes here</i></font><font> ); } } </font>
偏移提交的 Micronaut 管理提供了一些选项。我们通过 `OffsetStrategy` 枚举定义使用哪一个。您可以在 框架文档 中找到对它的出色描述。这是处理记录后使用手动提交的示例:
Micronaut 中 Kafka 类的配置比 Spring 中的配置更加简洁。在维护代码方面,更改或更新的地方更少了。但是,与 Spring 不同,我们不能以编程方式定义消费者,而无需使用注释。
Akka Stream
下一个客户端是带有 Alpakka 连接器 [url=https://doc.akka.io/docs/akka/current/stream/index.html]的 Akka Streams[/url]库。
此设置提供了一种使用来自 Kafka 的消息的反应方式。它在底层使用了 Akka Actors 框架。
在这里,我们将 Kafka 消费者作为事件源的实例。在提供的数据类型、提供的元数据和分区信息以及处理偏移提交的方式方面有所不同。
让我们从Consumer.plainSource. ConsumerRecord它在单个线程中为整个主题发出消息,保留给定分区的消息消费顺序。根据 Kafka 消费者配置,流可以自动提交已处理的记录。
Consumer .plainSource(consumerSettings, Subscriptions.topics(topicName)) .map(consumerRecord -> { <font><i>// processing logic comes here</i></font><font> <b>return</b> consumerRecord; }) .runWith(Sink.ignore(), materializer) .toCompletableFuture() .handle(AppSupport.doneHandler()) .join(); </font>
我们也可以选择手动提交消息。如果是这样,我们需要使用提供消费者记录和有关当前偏移量信息的可提交源之一。在处理完一条消息后,我们可以利用额外的数据来调用一个Committer实例来进行手动提交。
Consumer .committableSource(consumerSettings, Subscriptions.topics(topicName)) .map(committableMessage -> { <font><i>// processing logic comes here</i></font><font> <b>return</b> committableMessage; }) .mapAsync(maxParallelism, msg -> CompletableFuture.completedFuture(msg.committableOffset())) .runWith(Committer.sink(CommitterSettings.create(committerSettings)), materializer) .toCompletableFuture() .handle(AppSupport.doneHandler()) .join(); </font>
关于并行处理事件,该库也提供了出色的工具。最简单的解决方案是使用普通分区源,它发出记录源以及主题分区信息。当我们使用来自子源的消息时,该操作在每个分区的单独线程上运行。但是,我们可以使用分区信息以自定义方式分配线程分配(我们需要使用flatMapMerge和groupBy运算符)。
Consumer .plainPartitionedSource(consumerSettings, Subscriptions.topics(topicName)) .mapAsync(maxPartitions, pair -> { Source<ConsumerRecord<String, String>, NotUsed> source = pair.second(); <b>return</b> source .map(record -> { <font><i>// processing logic comes here</i></font><font> <b>return</b> record; }) .runWith(Sink.ignore(), materializer); }) .runWith(Sink.ignore(), materializer) .toCompletableFuture() .handle(AppSupport.doneHandler()) .join(); </font>
并行使用数据时最重要的是结果的顺序。我们有两个选择。第一个是使用mapAsync运算符。它使用参数中指定大小的线程池。该阶段确保下游发出消息的顺序,但不保证处理顺序。另一方面——假设发出消息的顺序对我们来说并不重要。在这种情况下,我们可以使用mapAsyncUnordered操作符——它会在处理完成后向下游传递消息,而不管接收顺序如何。
批处理也可用。我们可以通过使用类似groupedor的批处理操作符batch来实现它。在这种情况下,我们需要使用一个CommittableOffsetBatch实例并使用批处理中最后处理的消息的偏移量对其进行更新。然后,我们需要在流程的下一步中调用 commit。
Akka Streams 对 Kafka 的支持令人惊叹。它将消息作为数据流来消费,这是最适合 Kafka 消费者的方式。由于消息源的粒度,我们可以轻松地为我们的案例选择最合适的一个。通过利用 Streams 的强大 API,我们可以非常快速地获得批处理或背压等功能。使用 Akka Streams 时对我来说最大的缺点是操作符的丰富性。您可能需要一些时间来熟悉它。但是,当您查看 连接器源代码 时,您会发现许多如何将 Streams 与 Kafka 一起使用的示例。
Vertx
介绍的最后一种实现消费来自 Kafka 的消息的方法是使用 Vert.x 工具包 。该方法类似于 Akka Streams 的方法——它适用于 Verticle,一种轻量级 Actor 的形式。verticles 使用事件总线在彼此之间传递消息。它们可以在事件循环和工作线程上运行。
核心库提供基本功能(如在 Akka Streams 中),我们需要使用外部组件来连接 Kafka,即 vertx-kafka-client 。虽然一般假设与 Akka Streams 中的假设非常相似,但使用代码看起来不同。
Vert.x 应用程序使用事件循环和工作线程。前者将事件传递到目标顶点,并且可以运行快速、非阻塞的代码。后者的目的是完成繁重的工作,例如 I/O 或昂贵的计算。因此,我们应该考虑使用工作线程来消费 Kafka 消息,这样循环不会被阻塞,应用程序运行顺畅。示例代码包含作为工作人员运行的 Kafka verticles。
对,那么我们如何创建 Kafka 消息的消费者呢?Vert.x 客户端为此提供了一个类 — KafkaConsumer. 它提供了几种工厂方法,用于根据提供的配置创建实例。
有了消费者,我们需要在启动顶点之前订阅一些 Kafka 主题。我们可以从 subscribe 方法的几个变体中进行选择。调用其中之一使顶点能够从单个或多个主题中读取数据。下一步是注册处理函数,使用接收到的消息。所有这一切,我们都是通过在消费者上使用流畅的 API 来完成的。
这是为一个或多个主题创建普通消费者的方式,没有分区拆分到不同的线程。正如您在示例项目中看到的那样,我已经封装了 verticle 的逻辑并将其部署为 worker verticle。使用此解决方案,所有消息都将由同一个工作线程使用。
<b>class</b> KafkaVerticle <b>extends</b> AbstractVerticle { <font><i>// initialization</i></font><font> @Override <b>public</b> <b>void</b> start() { KafkaConsumer.create(vertx, kafkaConfig) .subscribe(topic) .handler(record -> </font><font><i>// processing logic comes here</i></font><font> ) .endHandler(v -> log.info(</font><font>"End of data. Topic: {}"</font><font>, <b>this</b>.topic)) .exceptionHandler(e -> log.error(</font><font>"Single Kafka consumer error"</font><font>, e)); } } </font>
根据 Kafka 配置,消费者可能会自动提交偏移量。但是手动触发动作呢?Vert.x 消费者提供了完成这项工作的提交方法。我们可以将它们称为单个消息或特定主题和分区的一组偏移量。
在为给定主题的所有分区创建消费者时,我们需要手动设置Vertx。
首先,我们需要知道我们想要处理哪些主题的多少个分区。我们可以使用KafkaAdminClient来描述我们感兴趣的话题。接下来,我们需要为每个主题-分区对创建一个包含 Kafka 消费者的专用线程。在线程内部,我们将消费者分配给所需的主题分区数据并指定处理程序,就像在“普通”消费者线程中一样。
根据Kafka的配置,消费者可能会自动提交offsets。但是手动触发动作呢?Vert.x消费者提供了做这个工作的提交方法。我们可以为单个消息或者为特定主题和分区的一堆偏移量调用它们。
当涉及到为某一主题的所有分区创建消费者时。
我们需要手动设置Vertx。首先,我们需要知道我们想处理哪些主题的多少个分区。我们可以调用KafkaAdminClient来描述我们感兴趣的主题。接下来,我们需要为每个主题分区对创建一个专用Vertx线程,包含Kafka消费者。在顶点内,我们将消费者分配给所需的主题分区数据,并像 "普通 "消费者顶点那样指定处理程序。
<font><i>// create required number of vertices</i></font><font> IntStream.range(0, numberOfPartitions) .forEach(partition -> { vertx.deployVerticle( () -> KafkaPartitionedVerticle.create(topic, partition, kafkaConfig), deploymentOptions, async -> log.info(</font><font>"Partitioned Kafka consumer deployed. DeploymentId: {}"</font><font>, async.result()) ); }); </font><font><i>// inside KafkaPartitionedVerticle</i></font><font> <b>class</b> KafkaPartitionedVerticle <b>extends</b> AbstractVerticle { </font><font><i>// initialization</i></font><font> @Override <b>public</b> <b>void</b> start() { KafkaConsumer.create(vertx, kafkaConfig) .assign(<b>new</b> TopicPartition(topic, partition), AsyncResult::result) .handler(record -> </font><font><i>// processing logic comes here</i></font><font> ) .endHandler(v -> log.info(</font><font>"End of data. Topic: {}, partition: {}"</font><font>, <b>this</b>.topic, <b>this</b>.partition)) .exceptionHandler(e -> log.error(</font><font>"Partitioned Kafka consumer error"</font><font>, e)); } } </font>
正如我上面提到的,每个Kafka消费者都使用一个专门的处理器来处理收到的消息。根据我们的需要,我们可以一条一条地消费记录,也可以分批消费。在前一种情况下,我们使用handler方法定义一个函数,而对于后者,我们使用 batchHandler方法。Kafka组件分别以KafkaConsumerRecord或KafkaConsumerRecords的形式提供记录。在这两种情况下,在引擎盖下,我们可以找到一个好的旧ConsumerRecord实例。
总结
哪一个更优越呢?我想说没有,除了一种情况。
如果你有一个项目考虑直接使用kafka-clients库,我会建议你不要这样做。这个库是一种连接到Kafka的驱动。在项目中使用它,我们需要接受的是,在某种程度上,我们将重新发明已经在其他工具中实现的轮子。
如果你的项目中的框架不限制你,我建议使用Akka Streams。
否则,寻找已经存在的集成。如果你的服务是在Spring框架内开发的,那么应用Akka Streams是没有意义的。
或者在Vert.x应用程序中使用Micronaut?