生产者流程
- 配置生产者客户端参数及创建响应的生产者实例
- 构建待发送的消息
- 发送消息
- 关闭生产者实例
参数配置
必要参数配置
bootstrap.servers: 该参数用来指定生产者客户端连接Kafka集群所需的broker地址清单,格式:host1:port1,host2:port2 key.serializer&value.serializer: broker端接收的消息必须以字节数组(byte[])的形式存在。
其他参数配置
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo"); 用来设定KafkaProducer对应的客户端id 如果不设置:KafkaProducer会自动生成一个非空字符串,内容形式如“producer-1”,“producer-2”
创建生产者实例
KafkaProducer是线程安全的,可以在多个线程***享单个KafkaProducer实例
KafkaProducer中有多个构造方法,比如在创建KafkaProducer实例时并没有设定“key.serializer”& “value.serializer”,那么就需要在构造方法中添加对应的序列化器,如下:
KafkaProducer<String, String> producer = new KafkaProducer<>(properties, new StringSerialiser(), new StringSerialiser());
内部原理其实一样
消息的发送
属性
构造方法
发送消息的三种模式
发后即忘(fire-and-forget)
它只管往Kafka发送消息而并不关心消息是否正确到达。在大多数情况下,这种方式没有什么问题,不过在某些时候(比如发生不可重试异常)会造成消息的丢失。这种发送方式的性能最高,可靠性最差。
同步(sync)
KafkaProducer的send方法并非是void类型,而是Future类型 通过get方法阻塞等待Kafka的响应,直到消息发送成功,或者发生异常。发生异常,捕获,交由外界处理
不可重试异常:不会进行任何重试,直接抛出,如RecordToolLargeException,所发消息太大; 可重试异常:如果配置了retries参数,那么只要在规定的重试次数内自行恢复,就不会抛出异常。 如: properties.put(ProducerConfig.RETRIES_CONFIG, 10);
可靠性高,要么成功,要么异常,性能差很多,需要阻塞等待一条信息发送完之后才能发送下一条。
异步(async)
一般是在send()方法里指定一个Callback的回调函数,Kafka在返回响应时调用该函数来实现异步的逻辑处理。
onCompletion()方法里的参数是互斥的。
拓展
producer.send(record1, callback1); producer.send(record2, callback2);
对于一个分区而言,如果消息record1先于消息record2发送,那么KafkaProducer就可以保证callback1先于 callback2调用,也就是说,回调函数的调用可以保证分区有序。
关闭
producer.close();
KafkaProducer不会只负责发送单条消息,更多的是发送多条消息,发送完后,需要调用close()方> 法来回收资源。
close()方***阻塞等待之前所有的发送请求完成后在关闭KafkaProducer。
序列化
configure()方法用来配置当前类
serialize()方法用来执行序列化操作
close()方法用来关闭当前的序列化器
分区器
消息通过send()方法发往broker的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer) 和分区器(Partitioner)的一系列作用后才能被真正地发往broker。
消息经过序列化后就需要确定发往哪个分区,如果消息ProducerRecord中指定了partition字段,> 那么就不需要分区器的作用。
Partitioner
partition()方法用来计算分区号。 参数(主题、键、序列化后的键、值、序列化后的值、集群的元数据信息)
close()方法用来关闭分区器的时候回收一些资源
DefaultPartitioner
生产者拦截器
生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某些规则过滤不符合要求的消息、 修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作
ProducerInterceptor
onsend ()方法用来KafkaProducer将消息序列化和计算分区之前会调用生产者拦截器的方法来 对消息进行相应的定制化操作
onAcknowledgement()方法,KafkaProducer会在消息被应答(Acknowledgement)之前或消息 发送失败时调用生产者拦截器的onAcknowledgement()方法,优先于用户设定的Callback之前 执行。这个方法运行在Producer的I/O线程中,所以这个方法中实现的代码逻辑越简单越好, 否则会影响消息的发送速度。
close()方法用于在关闭拦截器时执行一些资源的清理工作
原文链接:https://juejin.cn/post/6914892483281289230
如果觉得本文对你有帮助,可以关注一下我公众号,回复关键字【面试】即可得到一份Java核心知识点整理与一份面试大礼包!另有更多技术干货文章以及相关资料共享,大家一起学习进步!