前言
kafka 是一个消息队列产品,基于 Topic partitions 的设计,能达到非常高的消息发送处理性能。Spring 创建了一个项目 Spring-kafka,封装了 Apache 的 Kafka-client,用于在 Spring 项目里快速集成 kafka。除了简单的收发消息外,Spring-kafka 还提供了很多高级功能,下面我们就来一一探秘这些用法。
简单集成
引入依赖
-
<dependency> -
<groupId>org.springframework.kafka</groupId> -
<artifactId>spring-kafka</artifactId> -
<version>2.2.6.RELEASE</version> -
</dependency>
添加配置
-
spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
测试发送和接收
-
/** -
* @author: kl @kailing.pub -
* @date: 2019/5/30 -
*/ -
@SpringBootApplication -
@RestController -
public class Application { -
-
private final Logger logger = LoggerFactory.getLogger(Application.class); -
-
public static void main(String[] args) { -
SpringApplication.run(Application.class, args); -
} -
-
@Autowired -
private KafkaTemplate<Object, Object> template; -
-
@GetMapping("/send/{input}") -
public void sendFoo(@PathVariable String input) { -
this.template.send("topic_input", input); -
} -
@KafkaListener(id = "webGroup", topics = "topic_input") -
public void listen(String input) { -
logger.info("input value: {}" , input); -
} -
}
启动应用后,在浏览器中输入:http://localhost:8080/send/kl。就可以在控制台看到有日志输出了:input value: "kl"。基础的使用就这么简单。发送消息时注入一个 KafkaTemplate,接收消息时添加一个 @KafkaListener 注解即可。
Spring-kafka-test 嵌入式 Kafka Server
不过上面的代码能够启动成功,前提是你已经有了 Kafka Server 的服务环境,我们知道 Kafka 是由 Scala + Zookeeper 构建的,可以从官网下载部署包在本地部署。但是,我想告诉你,为了简化开发环节验证 Kafka 相关功能,Spring-Kafka-Test 已经封装了 Kafka-test 提供了注解式的一键开启 Kafka Server 的功能,使用起来也是超级简单。本文后面的所有测试用例的 Kafka 都是使用这种嵌入式服务提供的。
引入依赖
-
<dependency> -
<groupId>org.springframework.kafka</groupId> -
<artifactId>spring-kafka-test</artifactId> -
<version>2.2.6.RELEASE</version> -
<scope>test</scope> -
</dependency>
启动服务
下面使用 Junit 测试用例,直接启动一个 Kafka Server 服务,包含四个 Broker 节点。
-
@RunWith(SpringRunner.class) -
@SpringBootTest(classes = ApplicationTests.class) -
@EmbeddedKafka(count = 4,ports = {9092,9093,9094,9095}) -
public class ApplicationTests { -
@Test -
public void contextLoads()throws IOException { -
System.in.read(); -
} -
}
如上:只需要一个注解 @EmbeddedKafka 即可,就可以启动一个功能完整的 Kafka 服务,是不是很酷。默认只写注解不加参数的情况下,是创建一个随机端口的 Broker,在启动的日志中会输出具体的端口以及默认的一些配置项。不过这些我们在 Kafka 安装包配置文件中的配置项,在注解参数中都可以配置,下面详解下 @EmbeddedKafka 注解中的可设置参数 :
-
value:broker 节点数量
-
count:同 value 作用一样,也是配置的 broker 的节点数量
-
controlledShutdown:控制关闭开关,主要用来在 Broker 意外关闭时减少此 Broker 上 Partition 的不可用时间
Kafka 是多 Broker 架构的高可用服务,一个 Topic 对应多个 partition,一个 Partition 可以有多个副本 Replication,这些 Replication 副本保存在多个 Broker,用于高可用。但是,虽然存在多个分区副本集,当前工作副本集却只有一个,默认就是首次分配的副本集【首选副本】为 Leader,负责写入和读取数据。当我们升级 Broker 或者更新 Broker 配置时需要重启服务,这个时候需要将 partition 转移到可用的 Broker。下面涉及到三种情况
-
直接关闭 Broker:当 Broker 关闭时,Broker 集群会重新进行选主操作,选出一个新的 Broker 来作为 Partition Leader,选举时此 Broker 上的 Partition 会短时不可用
-
开启 controlledShutdown:当 Broker 关闭时,Broker 本身会先尝试将 Leader 角色转移到其他可用的 Broker 上
-
使用命令行工具:使用 bin/kafka-preferred-replica-election.sh,手动触发 PartitionLeader 角色转移
-
ports:端口列表,是一个数组。对应了 count 参数,有几个 Broker,就要对应几个端口号
-
brokerProperties:Broker 参数设置,是一个数组结构,支持如下方式进行 Broker 参数设置:
-
@EmbeddedKafka(brokerProperties = {"log.index.interval.bytes = 4096","num.io.threads = 8"})
-
okerPropertiesLocation:Broker 参数文件设置
功能同上面的 brokerProperties,只是 Kafka Broker 的可设置参数达 182 个之多,都像上面这样配置肯定不是最优方案,所以提供了加载本地配置文件的功能,如:
-
@EmbeddedKafka(brokerPropertiesLocation = "classpath:application.properties")
创建新的 Topic
默认情况下,如果在使用 KafkaTemplate 发送消息时,Topic 不存在,会创建一个新的 Topic,默认的分区数和副本数为如下 Broker 参数来设定
-
num.partitions = 1 #默认Topic分区数 -
num.replica.fetchers = 1 #默认副本数
程序启动时创建 Topic
-
/** -
* @author: kl @kailing.pub -
* @date: 2019/5/31 -
*/ -
@Configuration -
public class KafkaConfig { -
@Bean -
public KafkaAdmin admin(KafkaProperties properties){ -
KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties()); -
admin.setFatalIfBrokerNotAvailable(true); -
return admin; -
} -
@Bean -
public NewTopic topic2() { -
return new NewTopic("topic-kl", 1, (short) 1); -
} -
}
如果 Kafka Broker 支持(1.0.0 或更高版本),则如果发现现有 Topic 的 Partition 数少于设置的 Partition 数,则会新增新的 Partition 分区。关于 KafkaAdmin 有几个常用的用法如下:
setFatalIfBrokerNotAvailable(true):默认这个值是 False 的,在 Broker 不可用时,不影响 Spring 上下文的初始化。如果你觉得 Broker 不可用影响正常业务需要显示的将这个值设置为 True
setAutoCreate(false) : 默认值为 True,也就是 Kafka 实例化后会自动创建已经实例化的 NewTopic 对象
initialize():当 setAutoCreate 为 false 时,需要我们程序显示的调用 admin 的 initialize() 方法来初始化 NewTopic 对象
代码逻辑中创建
有时候我们在程序启动时并不知道某个 Topic 需要多少 Partition 数合适,但是又不能一股脑的直接使用 Broker 的默认设置,这个时候就需要使用 Kafka-Client 自带的 AdminClient 来进行处理。上面的 Spring 封装的 KafkaAdmin 也是使用的 AdminClient 来处理的。如:
-
@Autowired -
private KafkaProperties properties; -
@Test -
public void testCreateToipc(){ -
AdminClient client = AdminClient.create(properties.buildAdminProperties()); -
if(client !=null){ -
try { -
Collection<NewTopic> newTopics = new ArrayList<>(1); -
newTopics.add(new NewTopic("topic-kl",1,(short) 1)); -
client.createTopics(newTopics); -
}catch (Throwable e){ -
e.printStackTrace(); -
}finally { -
client.close(); -
} -
} -
}
ps: 其他的方式创建 Topic
上面的这些创建 Topic 方式前提是你的 spring boot 版本到 2.x 以上了,因为 spring-kafka2.x 版本只支持 spring boot2.x 的版本。在 1.x 的版本中还没有这些 api。下面补充一种在程序中通过 Kafka_2.10 创建 Topic 的方式
引入依赖
-
<dependency> -
<groupId>org.apache.kafka</groupId> -
<artifactId>kafka_2.10</artifactId> -
<version>0.8.2.2</version> -
</dependency>
api 方式创建
-
@Test -
public void testCreateTopic()throws Exception{ -
ZkClient zkClient =new ZkClient("127.0.0.1:2181", 3000, 3000, ZKStringSerializer$.MODULE$) -
String topicName = "topic-kl"; -
int partitions = 1; -
int replication = 1; -
AdminUtils.createTopic(zkClient,topicName,partitions,replication,new Properties()); -
}
注意下 ZkClient 最后一个构造入参,是一个序列化反序列化的接口实现,博主测试如果不填的话,创建的 Topic 在 ZK 上的数据是有问题的,默认的 Kafka 实现也很简单,就是做了字符串 UTF-8 编码处理。ZKStringSerializer$ 是 Kafka 中已经实现好的一个接口实例,是一个 Scala 的伴生对象,在 Java 中直接调用点 MODULE$ 就可以得到一个实例
命令方式创建
-
@Test -
public void testCreateTopic(){ -
String [] options= new String[]{ -
"--create", -
"--zookeeper","127.0.0.1:2181", -
"--replication-factor", "3", -
"--partitions", "3", -
"--topic", "topic-kl" -
}; -
TopicCommand.main(options); -
}
消息发送之 KafkaTemplate 探秘
获取发送结果
异步获取
-
template.send("","").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() { -
@Override -
public void onFailure(Throwable throwable) { -
...... -
} -
-
@Override -
public void onSuccess(SendResult<Object, Object> objectObjectSendResult) { -
.... -
} -
});
同步获取
-
ListenableFuture<SendResult<Object,Object>> future = template.send("topic-kl","kl"); -
try { -
SendResult<Object,Object> result = future.get(); -
}catch (Throwable e){ -
e.printStackTrace(); -
}
kafka 事务消息
默认情况下,Spring-kafka 自动生成的 KafkaTemplate 实例,是不具有事务消息发送能力的。需要使用如下配置激活事务特性。事务激活后,所有的消息发送只能在发生事务的方法内执行了,不然就会抛一个没有事务交易的异常
-
spring.kafka.producer.transaction-id-prefix=kafka_tx.
当发送消息有事务要求时,比如,当所有消息发送成功才算成功,如下面的例子:假设第一条消费发送后,在发第二条消息前出现了异常,那么第一条已经发送的消息也会回滚。而且正常情况下,假设在消息一发送后休眠一段时间,在发送第二条消息,消费端也只有在事务方法执行完成后才会接收到消息
-
@GetMapping("/send/{input}") -
public void sendFoo(@PathVariable String input) { -
template.executeInTransaction(t ->{ -
t.send("topic_input","kl"); -
if("error".equals(input)){ -
throw new RuntimeException("failed"); -
} -
t.send("topic_input","ckl"); -
return true; -
}); -
}
当事务特性激活时,同样,在方法上面加 @Transactional 注解也会生效
-
@GetMapping("/send/{input}") -
@Transactional(rollbackFor = RuntimeException.class) -
public void sendFoo(@PathVariable String input) { -
template.send("topic_input", "kl"); -
if ("error".equals(input)) { -
throw new RuntimeException("failed"); -
} -
template.send("topic_input", "ckl"); -
}
Spring-Kafka 的事务消息是基于 Kafka 提供的事务消息功能的。而 Kafka Broker 默认的配置针对的三个或以上 Broker 高可用服务而设置的。这边在测试的时候为了简单方便,使用了嵌入式服务新建了一个单 Broker 的 Kafka 服务,出现了一些问题:如
1、事务日志副本集大于 Broker 数量,会抛如下异常:
-
Number of alive brokers '1' does not meet the required replication factor '3' -
for the transactions state topic (configured via 'transaction.state.log.replication.factor'). -
This error can be ignored if the cluster is starting up and not all brokers are up yet.
默认 Broker 的配置 transaction.state.log.replication.factor=3,单节点只能调整为 1
2、副本数小于副本同步队列数目,会抛如下异常
-
Number of insync replicas for partition __transaction_state-13 is [1], below required minimum [2]
默认 Broker 的配置 transaction.state.log.min.isr=2,单节点只能调整为 1
ReplyingKafkaTemplate 获得消息回复
ReplyingKafkaTemplate 是 KafkaTemplate 的一个子类,除了继承父类的方法,新增了一个方法 sendAndReceive,实现了消息发送 \ 回复语义
-
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
也就是我发送一条消息,能够拿到消费者给我返回的结果。就像传统的 RPC 交互那样。当消息的发送者需要知道消息消费者的具体的消费情况,非常适合这个 api。如,一条消息中发送一批数据,需要知道消费者成功处理了哪些数据。下面代码演示了怎么集成以及使用 ReplyingKafkaTemplate
-
/** -
* @author: kl @kailing.pub -
* @date: 2019/5/30 -
*/ -
@SpringBootApplication -
@RestController -
public class Application { -
private final Logger logger = LoggerFactory.getLogger(Application.class); -
public static void main(String[] args) { -
SpringApplication.run(Application.class, args); -
} -
@Bean -
public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) { -
ConcurrentMessageListenerContainer<String, String> repliesContainer = containerFactory.createContainer("replies"); -
repliesContainer.getContainerProperties().setGroupId("repliesGroup"); -
repliesContainer.setAutoStartup(false); -
return repliesContainer; -
} -
-
@Bean -
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> pf, ConcurrentMessageListenerContainer<String, String> repliesContainer) { -
return new ReplyingKafkaTemplate(pf, repliesContainer); -
} -
-
@Bean -
public KafkaTemplate kafkaTemplate(ProducerFactory<String, String> pf) { -
return new KafkaTemplate(pf); -
} -
-
@Autowired -
private ReplyingKafkaTemplate template; -
-
@GetMapping("/send/{input}") -
@Transactional(rollbackFor = RuntimeException.class) -
public void sendFoo(@PathVariable String input) throws Exception { -
ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input); -
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record); -
ConsumerRecord<String, String> consumerRecord = replyFuture.get(); -
System.err.println("Return value: " + consumerRecord.value()); -
} -
-
@KafkaListener(id = "webGroup", topics = "topic-kl") -
@SendTo -
public String listen(String input) { -
logger.info("input value: {}", input); -
return "successful"; -
} -
}
Spring-kafka 消息消费用法探秘
@KafkaListener 的使用
前面在简单集成中已经演示过了 @KafkaListener 接收消息的能力,但是 @KafkaListener 的功能不止如此,其他的比较常见的,使用场景比较多的功能点如下:
-
显示的指定消费哪些 Topic 和分区的消息,
-
设置每个 Topic 以及分区初始化的偏移量,
-
设置消费线程并发度
-
设置消息异常处理器
-
@KafkaListener(id = "webGroup", topicPartitions = { -
@TopicPartition(topic = "topic1", partitions = {"0", "1"}), -
@TopicPartition(topic = "topic2", partitions = "0", -
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) -
},concurrency = "6",errorHandler = "myErrorHandler") -
public String listen(String input) { -
logger.info("input value: {}", input); -
return "successful"; -
}
其他的注解参数都很好理解,errorHandler 需要说明下,设置这个参数需要实现一个接口 KafkaListenerErrorHandler。而且注解里的配置,是你自定义实现实例在 spring 上下文中的 Name。比如,上面配置为 errorHandler = "myErrorHandler"。则在 spring 上线中应该存在这样一个实例:
-
/** -
* @author: kl @kailing.pub -
* @date: 2019/5/31 -
*/ -
@Service("myErrorHandler") -
public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler { -
Logger logger =LoggerFactory.getLogger(getClass()); -
@Override -
public Object handleError(Message<?> message, ListenerExecutionFailedException exception) { -
logger.info(message.getPayload().toString()); -
return null; -
} -
@Override -
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) { -
logger.info(message.getPayload().toString()); -
return null; -
} -
}
手动 Ack 模式
手动 ACK 模式,由业务逻辑控制提交偏移量。比如程序在消费时,有这种语义,特别异常情况下不确认 ack,也就是不提交偏移量,那么你只能使用手动 Ack 模式来做了。开启手动首先需要关闭自动提交,然后设置下 consumer 的消费模式
-
spring.kafka.consumer.enable-auto-commit=false -
spring.kafka.listener.ack-mode=manual
上面的设置好后,在消费时,只需要在 @KafkaListener 监听方法的入参加入 Acknowledgment 即可,执行到 ack.acknowledge() 代表提交了偏移量
-
@KafkaListener(id = "webGroup", topics = "topic-kl") -
public String listen(String input, Acknowledgment ack) { -
logger.info("input value: {}", input); -
if ("kl".equals(input)) { -
ack.acknowledge(); -
} -
return "successful"; -
}
@KafkaListener 注解***生命周期
@KafkaListener 注解的***的生命周期是可以控制的,默认情况下,@KafkaListener 的参数 autoStartup = "true"。也就是自动启动消费,但是也可以同过 KafkaListenerEndpointRegistry 来干预他的生命周期。KafkaListenerEndpointRegistry 有三个动作方法分别如:start(),pause(),resume()/ 启动,停止,继续。如下代码详细演示了这种功能。
-
/** -
* @author: kl @kailing.pub -
* @date: 2019/5/30 -
*/ -
@SpringBootApplication -
@RestController -
public class Application { -
private final Logger logger = LoggerFactory.getLogger(Application.class); -
-
public static void main(String[] args) { -
SpringApplication.run(Application.class, args); -
} -
-
@Autowired -
private KafkaTemplate template; -
-
@GetMapping("/send/{input}") -
@Transactional(rollbackFor = RuntimeException.class) -
public void sendFoo(@PathVariable String input) throws Exception { -
ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input); -
template.send(record); -
} -
-
@Autowired -
private KafkaListenerEndpointRegistry registry; -
-
@GetMapping("/stop/{listenerID}") -
public void stop(@PathVariable String listenerID){ -
registry.getListenerContainer(listenerID).pause(); -
} -
@GetMapping("/resume/{listenerID}") -
public void resume(@PathVariable String listenerID){ -
registry.getListenerContainer(listenerID).resume(); -
} -
@GetMapping("/start/{listenerID}") -
public void start(@PathVariable String listenerID){ -
registry.getListenerContainer(listenerID).start(); -
} -
@KafkaListener(id = "webGroup", topics = "topic-kl",autoStartup = "false") -
public String listen(String input) { -
logger.info("input value: {}", input); -
return "successful"; -
} -
}
在上面的代码中,listenerID 就是 @KafkaListener 中的 id 值 “webGroup”。项目启动好后,分别执行如下 url,就可以看到效果了。
先发送一条消息:http://localhost:8081/send/ckl。因为 autoStartup = "false",所以并不会看到有消息进入***。
接着启动***:http://localhost:8081/start/webGroup。可以看到有一条消息进来了。
暂停和继续消费的效果使用类似方法就可以测试出来了。
SendTo 消息转发
前面的消息发送响应应用里面已经见过 @SendTo, 其实除了做发送响应语义外,@SendTo 注解还可以带一个参数,指定转发的 Topic 队列。常见的场景如,一个消息需要做多重加工,不同的加工耗费的 cup 等资源不一致,那么就可以通过跨不同 Topic 和部署在不同主机上的 consumer 来解决了。如:
-
@KafkaListener(id = "webGroup", topics = "topic-kl") -
@SendTo("topic-ckl") -
public String listen(String input) { -
logger.info("input value: {}", input); -
return input + "hello!"; -
} -
-
@KafkaListener(id = "webGroup1", topics = "topic-ckl") -
public void listen2(String input) { -
logger.info("input value: {}", input); -
}
消息重试和死信队列的应用
除了上面谈到的通过手动 Ack 模式来控制消息偏移量外,其实 Spring-kafka 内部还封装了可重试消费消息的语义,也就是可以设置为当消费数据出现异常时,重试这个消息。而且可以设置重试达到多少次后,让消息进入预定好的 Topic。也就是死信队列里。下面代码演示了这种效果:
-
@Autowired -
private KafkaTemplate template; -
-
@Bean -
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory( -
ConcurrentKafkaListenerContainerFactoryConfigurer configurer, -
ConsumerFactory<Object, Object> kafkaConsumerFactory, -
KafkaTemplate<Object, Object> template) { -
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); -
configurer.configure(factory, kafkaConsumerFactory); -
//最大重试三次 -
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3)); -
return factory; -
} -
-
@GetMapping("/send/{input}") -
public void sendFoo(@PathVariable String input) { -
template.send("topic-kl", input); -
} -
-
@KafkaListener(id = "webGroup", topics = "topic-kl") -
public String listen(String input) { -
logger.info("input value: {}", input); -
throw new RuntimeException("dlt"); -
} -
-
@KafkaListener(id = "dltGroup", topics = "topic-kl.DLT") -
public void dltListen(String input) { -
logger.info("Received from DLT: " + input); -
}
上面应用,在 topic-kl 监听到消息会,会触发运行时异常,然后***会尝试三次调用,当到达最大的重试次数后。消息就会被丢掉重试死信队列里面去。死信队列的 Topic 的规则是,业务 Topic 名字 +“.DLT”。如上面业务 Topic 的 name 为 “topic-kl”,那么对应的死信队列的 Topic 就是 “topic-kl.DLT”
文末结语
最近业务上使用了 kafka 用到了 Spring-kafka,所以系统性的探索了下 Spring-kafka 的各种用法,发现了很多好玩很酷的特性,比如,一个注解开启嵌入式的 Kafka 服务、像 RPC 调用一样的发送、响应语义调用、事务消息等功能。希望此博文能够帮助那些正在使用 Spring-kafka 或即将使用的人少走一些弯路少踩一点坑。



京公网安备 11010502036488号