前言

kafka 是一个消息队列产品,基于 Topic partitions 的设计,能达到非常高的消息发送处理性能。Spring 创建了一个项目 Spring-kafka,封装了 Apache 的 Kafka-client,用于在 Spring 项目里快速集成 kafka。除了简单的收发消息外,Spring-kafka 还提供了很多高级功能,下面我们就来一一探秘这些用法。

简单集成

引入依赖

  1. <dependency>

  2. <groupId>org.springframework.kafka</groupId>

  3. <artifactId>spring-kafka</artifactId>

  4. <version>2.2.6.RELEASE</version>

  5. </dependency>

添加配置

  1. spring.kafka.producer.bootstrap-servers=127.0.0.1:9092

测试发送和接收

  1. /**

  2. * @author: kl @kailing.pub

  3. * @date: 2019/5/30

  4. */

  5. @SpringBootApplication

  6. @RestController

  7. public class Application {

  8.  

  9. private final Logger logger = LoggerFactory.getLogger(Application.class);

  10.  

  11. public static void main(String[] args) {

  12. SpringApplication.run(Application.class, args);

  13. }

  14.  

  15. @Autowired

  16. private KafkaTemplate<Object, Object> template;

  17.  

  18. @GetMapping("/send/{input}")

  19. public void sendFoo(@PathVariable String input) {

  20. this.template.send("topic_input", input);

  21. }

  22. @KafkaListener(id = "webGroup", topics = "topic_input")

  23. public void listen(String input) {

  24. logger.info("input value: {}" , input);

  25. }

  26. }

启动应用后,在浏览器中输入:http://localhost:8080/send/kl。就可以在控制台看到有日志输出了:input value: "kl"。基础的使用就这么简单。发送消息时注入一个 KafkaTemplate,接收消息时添加一个 @KafkaListener 注解即可。

Spring-kafka-test 嵌入式 Kafka Server

不过上面的代码能够启动成功,前提是你已经有了 Kafka Server 的服务环境,我们知道 Kafka 是由 Scala + Zookeeper 构建的,可以从官网下载部署包在本地部署。但是,我想告诉你,为了简化开发环节验证 Kafka 相关功能,Spring-Kafka-Test 已经封装了 Kafka-test 提供了注解式的一键开启 Kafka Server 的功能,使用起来也是超级简单。本文后面的所有测试用例的 Kafka 都是使用这种嵌入式服务提供的。

引入依赖

  1. <dependency>

  2. <groupId>org.springframework.kafka</groupId>

  3. <artifactId>spring-kafka-test</artifactId>

  4. <version>2.2.6.RELEASE</version>

  5. <scope>test</scope>

  6. </dependency>

启动服务

下面使用 Junit 测试用例,直接启动一个 Kafka Server 服务,包含四个 Broker 节点。

  1. @RunWith(SpringRunner.class)

  2. @SpringBootTest(classes = ApplicationTests.class)

  3. @EmbeddedKafka(count = 4,ports = {9092,9093,9094,9095})

  4. public class ApplicationTests {

  5. @Test

  6. public void contextLoads()throws IOException {

  7. System.in.read();

  8. }

  9. }

如上:只需要一个注解 @EmbeddedKafka 即可,就可以启动一个功能完整的 Kafka 服务,是不是很酷。默认只写注解不加参数的情况下,是创建一个随机端口的 Broker,在启动的日志中会输出具体的端口以及默认的一些配置项。不过这些我们在 Kafka 安装包配置文件中的配置项,在注解参数中都可以配置,下面详解下 @EmbeddedKafka 注解中的可设置参数 :

  • value:broker 节点数量

  • count:同 value 作用一样,也是配置的 broker 的节点数量

  • controlledShutdown:控制关闭开关,主要用来在 Broker 意外关闭时减少此 Broker 上 Partition 的不可用时间

Kafka 是多 Broker 架构的高可用服务,一个 Topic 对应多个 partition,一个 Partition 可以有多个副本 Replication,这些 Replication 副本保存在多个 Broker,用于高可用。但是,虽然存在多个分区副本集,当前工作副本集却只有一个,默认就是首次分配的副本集【首选副本】为 Leader,负责写入和读取数据。当我们升级 Broker 或者更新 Broker 配置时需要重启服务,这个时候需要将 partition 转移到可用的 Broker。下面涉及到三种情况

  1.    直接关闭 Broker:当 Broker 关闭时,Broker 集群会重新进行选主操作,选出一个新的 Broker 来作为 Partition Leader,选举时此 Broker 上的 Partition 会短时不可用

  2.   开启 controlledShutdown:当 Broker 关闭时,Broker 本身会先尝试将 Leader 角色转移到其他可用的 Broker 上

  3.   使用命令行工具:使用 bin/kafka-preferred-replica-election.sh,手动触发 PartitionLeader 角色转移

  • ports:端口列表,是一个数组。对应了 count 参数,有几个 Broker,就要对应几个端口号

  • brokerProperties:Broker 参数设置,是一个数组结构,支持如下方式进行 Broker 参数设置:

  1. @EmbeddedKafka(brokerProperties = {"log.index.interval.bytes = 4096","num.io.threads = 8"})

  • okerPropertiesLocation:Broker 参数文件设置

功能同上面的 brokerProperties,只是 Kafka Broker 的可设置参数达 182 个之多,都像上面这样配置肯定不是最优方案,所以提供了加载本地配置文件的功能,如:

  1. @EmbeddedKafka(brokerPropertiesLocation = "classpath:application.properties")

创建新的 Topic

默认情况下,如果在使用 KafkaTemplate 发送消息时,Topic 不存在,会创建一个新的 Topic,默认的分区数和副本数为如下 Broker 参数来设定

  1. num.partitions = 1 #默认Topic分区数

  2. num.replica.fetchers = 1 #默认副本数

程序启动时创建 Topic

  1. /**

  2. * @author: kl @kailing.pub

  3. * @date: 2019/5/31

  4. */

  5. @Configuration

  6. public class KafkaConfig {

  7. @Bean

  8. public KafkaAdmin admin(KafkaProperties properties){

  9. KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties());

  10. admin.setFatalIfBrokerNotAvailable(true);

  11. return admin;

  12. }

  13. @Bean

  14. public NewTopic topic2() {

  15. return new NewTopic("topic-kl", 1, (short) 1);

  16. }

  17. }

如果 Kafka Broker 支持(1.0.0 或更高版本),则如果发现现有 Topic 的 Partition 数少于设置的 Partition 数,则会新增新的 Partition 分区。关于 KafkaAdmin 有几个常用的用法如下:

setFatalIfBrokerNotAvailable(true):默认这个值是 False 的,在 Broker 不可用时,不影响 Spring 上下文的初始化。如果你觉得 Broker 不可用影响正常业务需要显示的将这个值设置为 True

setAutoCreate(false) : 默认值为 True,也就是 Kafka 实例化后会自动创建已经实例化的 NewTopic 对象

initialize():当 setAutoCreate 为 false 时,需要我们程序显示的调用 admin 的 initialize() 方法来初始化 NewTopic 对象

代码逻辑中创建

有时候我们在程序启动时并不知道某个 Topic 需要多少 Partition 数合适,但是又不能一股脑的直接使用 Broker 的默认设置,这个时候就需要使用 Kafka-Client 自带的 AdminClient 来进行处理。上面的 Spring 封装的 KafkaAdmin 也是使用的 AdminClient 来处理的。如:

  1. @Autowired

  2. private KafkaProperties properties;

  3. @Test

  4. public void testCreateToipc(){

  5. AdminClient client = AdminClient.create(properties.buildAdminProperties());

  6. if(client !=null){

  7. try {

  8. Collection<NewTopic> newTopics = new ArrayList<>(1);

  9. newTopics.add(new NewTopic("topic-kl",1,(short) 1));

  10. client.createTopics(newTopics);

  11. }catch (Throwable e){

  12. e.printStackTrace();

  13. }finally {

  14. client.close();

  15. }

  16. }

  17. }

ps: 其他的方式创建 Topic

上面的这些创建 Topic 方式前提是你的 spring boot 版本到 2.x 以上了,因为 spring-kafka2.x 版本只支持 spring boot2.x 的版本。在 1.x 的版本中还没有这些 api。下面补充一种在程序中通过 Kafka_2.10 创建 Topic 的方式

引入依赖

  1. <dependency>

  2. <groupId>org.apache.kafka</groupId>

  3. <artifactId>kafka_2.10</artifactId>

  4. <version>0.8.2.2</version>

  5. </dependency>

api 方式创建

  1. @Test

  2. public void testCreateTopic()throws Exception{

  3. ZkClient zkClient =new ZkClient("127.0.0.1:2181", 3000, 3000, ZKStringSerializer$.MODULE$)

  4. String topicName = "topic-kl";

  5. int partitions = 1;

  6. int replication = 1;

  7. AdminUtils.createTopic(zkClient,topicName,partitions,replication,new Properties());

  8. }

注意下 ZkClient 最后一个构造入参,是一个序列化反序列化的接口实现,博主测试如果不填的话,创建的 Topic 在 ZK 上的数据是有问题的,默认的 Kafka 实现也很简单,就是做了字符串 UTF-8 编码处理。ZKStringSerializer$ 是 Kafka 中已经实现好的一个接口实例,是一个 Scala 的伴生对象,在 Java 中直接调用点 MODULE$ 就可以得到一个实例

命令方式创建

  1. @Test

  2. public void testCreateTopic(){

  3. String [] options= new String[]{

  4. "--create",

  5. "--zookeeper","127.0.0.1:2181",

  6. "--replication-factor", "3",

  7. "--partitions", "3",

  8. "--topic", "topic-kl"

  9. };

  10. TopicCommand.main(options);

  11. }

消息发送之 KafkaTemplate 探秘

获取发送结果

异步获取

  1. template.send("","").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {

  2. @Override

  3. public void onFailure(Throwable throwable) {

  4. ......

  5. }

  6.  

  7. @Override

  8. public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {

  9. ....

  10. }

  11. });

同步获取

  1. ListenableFuture<SendResult<Object,Object>> future = template.send("topic-kl","kl");

  2. try {

  3. SendResult<Object,Object> result = future.get();

  4. }catch (Throwable e){

  5. e.printStackTrace();

  6. }

kafka 事务消息

默认情况下,Spring-kafka 自动生成的 KafkaTemplate 实例,是不具有事务消息发送能力的。需要使用如下配置激活事务特性。事务激活后,所有的消息发送只能在发生事务的方法内执行了,不然就会抛一个没有事务交易的异常

  1. spring.kafka.producer.transaction-id-prefix=kafka_tx.

当发送消息有事务要求时,比如,当所有消息发送成功才算成功,如下面的例子:假设第一条消费发送后,在发第二条消息前出现了异常,那么第一条已经发送的消息也会回滚。而且正常情况下,假设在消息一发送后休眠一段时间,在发送第二条消息,消费端也只有在事务方法执行完成后才会接收到消息

  1. @GetMapping("/send/{input}")

  2. public void sendFoo(@PathVariable String input) {

  3. template.executeInTransaction(t ->{

  4. t.send("topic_input","kl");

  5. if("error".equals(input)){

  6. throw new RuntimeException("failed");

  7. }

  8. t.send("topic_input","ckl");

  9. return true;

  10. });

  11. }

当事务特性激活时,同样,在方法上面加 @Transactional 注解也会生效

  1. @GetMapping("/send/{input}")

  2. @Transactional(rollbackFor = RuntimeException.class)

  3. public void sendFoo(@PathVariable String input) {

  4. template.send("topic_input", "kl");

  5. if ("error".equals(input)) {

  6. throw new RuntimeException("failed");

  7. }

  8. template.send("topic_input", "ckl");

  9. }

Spring-Kafka 的事务消息是基于 Kafka 提供的事务消息功能的。而 Kafka Broker 默认的配置针对的三个或以上 Broker 高可用服务而设置的。这边在测试的时候为了简单方便,使用了嵌入式服务新建了一个单 Broker 的 Kafka 服务,出现了一些问题:如

1、事务日志副本集大于 Broker 数量,会抛如下异常:

 
  1. Number of alive brokers '1' does not meet the required replication factor '3'

  2. for the transactions state topic (configured via 'transaction.state.log.replication.factor').

  3. This error can be ignored if the cluster is starting up and not all brokers are up yet.

默认 Broker 的配置 transaction.state.log.replication.factor=3,单节点只能调整为 1

2、副本数小于副本同步队列数目,会抛如下异常

 
  1. Number of insync replicas for partition __transaction_state-13 is [1], below required minimum [2]

默认 Broker 的配置 transaction.state.log.min.isr=2,单节点只能调整为 1

ReplyingKafkaTemplate 获得消息回复

ReplyingKafkaTemplate 是 KafkaTemplate 的一个子类,除了继承父类的方法,新增了一个方法 sendAndReceive,实现了消息发送 \ 回复语义

 
  1. RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);

也就是我发送一条消息,能够拿到消费者给我返回的结果。就像传统的 RPC 交互那样。当消息的发送者需要知道消息消费者的具体的消费情况,非常适合这个 api。如,一条消息中发送一批数据,需要知道消费者成功处理了哪些数据。下面代码演示了怎么集成以及使用 ReplyingKafkaTemplate

 
  1. /**

  2. * @author: kl @kailing.pub

  3. * @date: 2019/5/30

  4. */

  5. @SpringBootApplication

  6. @RestController

  7. public class Application {

  8. private final Logger logger = LoggerFactory.getLogger(Application.class);

  9. public static void main(String[] args) {

  10. SpringApplication.run(Application.class, args);

  11. }

  12. @Bean

  13. public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

  14. ConcurrentMessageListenerContainer<String, String> repliesContainer = containerFactory.createContainer("replies");

  15. repliesContainer.getContainerProperties().setGroupId("repliesGroup");

  16. repliesContainer.setAutoStartup(false);

  17. return repliesContainer;

  18. }

  19.  

  20. @Bean

  21. public ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> pf, ConcurrentMessageListenerContainer<String, String> repliesContainer) {

  22. return new ReplyingKafkaTemplate(pf, repliesContainer);

  23. }

  24.  

  25. @Bean

  26. public KafkaTemplate kafkaTemplate(ProducerFactory<String, String> pf) {

  27. return new KafkaTemplate(pf);

  28. }

  29.  

  30. @Autowired

  31. private ReplyingKafkaTemplate template;

  32.  

  33. @GetMapping("/send/{input}")

  34. @Transactional(rollbackFor = RuntimeException.class)

  35. public void sendFoo(@PathVariable String input) throws Exception {

  36. ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input);

  37. RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);

  38. ConsumerRecord<String, String> consumerRecord = replyFuture.get();

  39. System.err.println("Return value: " + consumerRecord.value());

  40. }

  41.  

  42. @KafkaListener(id = "webGroup", topics = "topic-kl")

  43. @SendTo

  44. public String listen(String input) {

  45. logger.info("input value: {}", input);

  46. return "successful";

  47. }

  48. }

Spring-kafka 消息消费用法探秘

@KafkaListener 的使用

前面在简单集成中已经演示过了 @KafkaListener 接收消息的能力,但是 @KafkaListener 的功能不止如此,其他的比较常见的,使用场景比较多的功能点如下:

  • 显示的指定消费哪些 Topic 和分区的消息,

  • 设置每个 Topic 以及分区初始化的偏移量,

  • 设置消费线程并发度

  • 设置消息异常处理器

 
  1. @KafkaListener(id = "webGroup", topicPartitions = {

  2. @TopicPartition(topic = "topic1", partitions = {"0", "1"}),

  3. @TopicPartition(topic = "topic2", partitions = "0",

  4. partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))

  5. },concurrency = "6",errorHandler = "myErrorHandler")

  6. public String listen(String input) {

  7. logger.info("input value: {}", input);

  8. return "successful";

  9. }

其他的注解参数都很好理解,errorHandler 需要说明下,设置这个参数需要实现一个接口 KafkaListenerErrorHandler。而且注解里的配置,是你自定义实现实例在 spring 上下文中的 Name。比如,上面配置为 errorHandler = "myErrorHandler"。则在 spring 上线中应该存在这样一个实例:

 
  1. /**

  2. * @author: kl @kailing.pub

  3. * @date: 2019/5/31

  4. */

  5. @Service("myErrorHandler")

  6. public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler {

  7. Logger logger =LoggerFactory.getLogger(getClass());

  8. @Override

  9. public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {

  10. logger.info(message.getPayload().toString());

  11. return null;

  12. }

  13. @Override

  14. public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {

  15. logger.info(message.getPayload().toString());

  16. return null;

  17. }

  18. }

手动 Ack 模式

手动 ACK 模式,由业务逻辑控制提交偏移量。比如程序在消费时,有这种语义,特别异常情况下不确认 ack,也就是不提交偏移量,那么你只能使用手动 Ack 模式来做了。开启手动首先需要关闭自动提交,然后设置下 consumer 的消费模式

 
  1. spring.kafka.consumer.enable-auto-commit=false

  2. spring.kafka.listener.ack-mode=manual

上面的设置好后,在消费时,只需要在 @KafkaListener 监听方法的入参加入 Acknowledgment 即可,执行到 ack.acknowledge() 代表提交了偏移量

 
  1. @KafkaListener(id = "webGroup", topics = "topic-kl")

  2. public String listen(String input, Acknowledgment ack) {

  3. logger.info("input value: {}", input);

  4. if ("kl".equals(input)) {

  5. ack.acknowledge();

  6. }

  7. return "successful";

  8. }

@KafkaListener 注解***生命周期

@KafkaListener 注解的***的生命周期是可以控制的,默认情况下,@KafkaListener 的参数 autoStartup = "true"。也就是自动启动消费,但是也可以同过 KafkaListenerEndpointRegistry 来干预他的生命周期。KafkaListenerEndpointRegistry 有三个动作方法分别如:start(),pause(),resume()/ 启动,停止,继续。如下代码详细演示了这种功能。

 
  1. /**

  2. * @author: kl @kailing.pub

  3. * @date: 2019/5/30

  4. */

  5. @SpringBootApplication

  6. @RestController

  7. public class Application {

  8. private final Logger logger = LoggerFactory.getLogger(Application.class);

  9.  

  10. public static void main(String[] args) {

  11. SpringApplication.run(Application.class, args);

  12. }

  13.  

  14. @Autowired

  15. private KafkaTemplate template;

  16.  

  17. @GetMapping("/send/{input}")

  18. @Transactional(rollbackFor = RuntimeException.class)

  19. public void sendFoo(@PathVariable String input) throws Exception {

  20. ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input);

  21. template.send(record);

  22. }

  23.  

  24. @Autowired

  25. private KafkaListenerEndpointRegistry registry;

  26.  

  27. @GetMapping("/stop/{listenerID}")

  28. public void stop(@PathVariable String listenerID){

  29. registry.getListenerContainer(listenerID).pause();

  30. }

  31. @GetMapping("/resume/{listenerID}")

  32. public void resume(@PathVariable String listenerID){

  33. registry.getListenerContainer(listenerID).resume();

  34. }

  35. @GetMapping("/start/{listenerID}")

  36. public void start(@PathVariable String listenerID){

  37. registry.getListenerContainer(listenerID).start();

  38. }

  39. @KafkaListener(id = "webGroup", topics = "topic-kl",autoStartup = "false")

  40. public String listen(String input) {

  41. logger.info("input value: {}", input);

  42. return "successful";

  43. }

  44. }

在上面的代码中,listenerID 就是 @KafkaListener 中的 id 值 “webGroup”。项目启动好后,分别执行如下 url,就可以看到效果了。

先发送一条消息:http://localhost:8081/send/ckl。因为 autoStartup = "false",所以并不会看到有消息进入***。

接着启动***:http://localhost:8081/start/webGroup。可以看到有一条消息进来了。

暂停和继续消费的效果使用类似方法就可以测试出来了。

SendTo 消息转发

前面的消息发送响应应用里面已经见过 @SendTo, 其实除了做发送响应语义外,@SendTo 注解还可以带一个参数,指定转发的 Topic 队列。常见的场景如,一个消息需要做多重加工,不同的加工耗费的 cup 等资源不一致,那么就可以通过跨不同 Topic 和部署在不同主机上的 consumer 来解决了。如:

 
  1. @KafkaListener(id = "webGroup", topics = "topic-kl")

  2. @SendTo("topic-ckl")

  3. public String listen(String input) {

  4. logger.info("input value: {}", input);

  5. return input + "hello!";

  6. }

  7.  

  8. @KafkaListener(id = "webGroup1", topics = "topic-ckl")

  9. public void listen2(String input) {

  10. logger.info("input value: {}", input);

  11. }

消息重试和死信队列的应用

除了上面谈到的通过手动 Ack 模式来控制消息偏移量外,其实 Spring-kafka 内部还封装了可重试消费消息的语义,也就是可以设置为当消费数据出现异常时,重试这个消息。而且可以设置重试达到多少次后,让消息进入预定好的 Topic。也就是死信队列里。下面代码演示了这种效果:

 
  1. @Autowired

  2. private KafkaTemplate template;

  3.  

  4. @Bean

  5. public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(

  6. ConcurrentKafkaListenerContainerFactoryConfigurer configurer,

  7. ConsumerFactory<Object, Object> kafkaConsumerFactory,

  8. KafkaTemplate<Object, Object> template) {

  9. ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();

  10. configurer.configure(factory, kafkaConsumerFactory);

  11. //最大重试三次

  12. factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3));

  13. return factory;

  14. }

  15.  

  16. @GetMapping("/send/{input}")

  17. public void sendFoo(@PathVariable String input) {

  18. template.send("topic-kl", input);

  19. }

  20.  

  21. @KafkaListener(id = "webGroup", topics = "topic-kl")

  22. public String listen(String input) {

  23. logger.info("input value: {}", input);

  24. throw new RuntimeException("dlt");

  25. }

  26.  

  27. @KafkaListener(id = "dltGroup", topics = "topic-kl.DLT")

  28. public void dltListen(String input) {

  29. logger.info("Received from DLT: " + input);

  30. }

上面应用,在 topic-kl 监听到消息会,会触发运行时异常,然后***会尝试三次调用,当到达最大的重试次数后。消息就会被丢掉重试死信队列里面去。死信队列的 Topic 的规则是,业务 Topic 名字 +“.DLT”。如上面业务 Topic 的 name 为 “topic-kl”,那么对应的死信队列的 Topic 就是 “topic-kl.DLT”

文末结语

最近业务上使用了 kafka 用到了 Spring-kafka,所以系统性的探索了下 Spring-kafka 的各种用法,发现了很多好玩很酷的特性,比如,一个注解开启嵌入式的 Kafka 服务、像 RPC 调用一样的发送、响应语义调用、事务消息等功能。希望此博文能够帮助那些正在使用 Spring-kafka 或即将使用的人少走一些弯路少踩一点坑。