写在前面

这里我会总结一下,Java 项目集成中,kafka Producer 的相关API使用,以及相关注意事项

集成方式 : spring-kafka

集成配置

spring:
  kafka:
    bootstrap-servers: 192.168.1.74:9092

    producer:
      retries: 3
      batch-size: 16384 # 默认 16Kb
      buffer-memory: 33554432 # 32M
      transaction-id-prefix: tx.
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

这里的几个参数需要注意

  • retries ,重试次数
  • batch-size,batch 的大小
  • buffer-memory,内存缓冲大小
  • transaction-id-prefix,事务前缀

一、topic

关于 Topic 的生成、管理,也是很重要的一部分

可 包含以下

  • topic 与topic.DLT的对称管理
  • message 的序列化与反序列化
  • key,分区,副本
  • 异常消息转换、处理

在使用中都是要特别留意的知识点

二、KafkaTemplate 相关API

发布相关的API,send

更多API

三、相关使用

3.1、回调函数示例

   @Transactional
    public void sendMessage2(String message) {
   
        ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send(TOPIC, message);
        send.addCallback(
                result -> {
   
                    logger.info("当前线程ID {} send {} to {} success", Thread.currentThread().getId(), message, TOPIC);
                },
                ex -> {
   
// serviceWithRetry();
                    logger.info("当前线程ID {} send message to {} failure,当前消息{} ,error message:{}", Thread.currentThread().getId(), TOPIC, message, ex.getMessage());
                }
        );
    }

水平有限,这里只是简单介绍了一下,更多 Topic 监控,producer 模式,等最佳实现方式,还有待测试,总结

源码 地址 Gitee

四、更多Kafka,,客户端、服务端,监控可参考下文