写在前面
这里我会总结一下,Java 项目集成中,kafka Producer 的相关API使用,以及相关注意事项
集成方式 : spring-kafka
集成配置
spring:
kafka:
bootstrap-servers: 192.168.1.74:9092
producer:
retries: 3
batch-size: 16384 # 默认 16Kb
buffer-memory: 33554432 # 32M
transaction-id-prefix: tx.
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
这里的几个参数需要注意
- retries ,重试次数
- batch-size,batch 的大小
- buffer-memory,内存缓冲大小
- transaction-id-prefix,事务前缀
一、topic
关于 Topic 的生成、管理,也是很重要的一部分
可 包含以下
- topic 与topic.DLT的对称管理
- message 的序列化与反序列化
- key,分区,副本
- 异常消息转换、处理
在使用中都是要特别留意的知识点
二、KafkaTemplate 相关API
发布相关的API,send
更多API
三、相关使用
3.1、回调函数示例
@Transactional
public void sendMessage2(String message) {
ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send(TOPIC, message);
send.addCallback(
result -> {
logger.info("当前线程ID {} send {} to {} success", Thread.currentThread().getId(), message, TOPIC);
},
ex -> {
// serviceWithRetry();
logger.info("当前线程ID {} send message to {} failure,当前消息{} ,error message:{}", Thread.currentThread().getId(), TOPIC, message, ex.getMessage());
}
);
}
水平有限,这里只是简单介绍了一下,更多 Topic 监控,producer 模式,等最佳实现方式,还有待测试,总结
源码 地址 Gitee