写在前面
这里我会总结一下,Springboot 集成 spring-kafka中,consumer 的相关配置,Api
这里的东西,比 Producer 稍微多一些
集成相关配置
server:
port: 9000
spring:
kafka:
bootstrap-servers: 192.168.1.74:9092
consumer:
group-id: group_id
# 手动提交
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
session.timeout.ms: 60000
listener:
type: batch
log-container-config: false
concurrency: 3
# 手动提交
ack-mode: manual_immediate
一、@KafkaListener
Spring Kafka 集成Kafka,消息的监听者配置,使用特别简单,只有一个注解即可实现监听
虽然只有一个注解配置,需要注意消息的序列化,以及相关监听处理
二、代码示例
示例一
@KafkaListener(topics = "users", groupId = "group_id")
public void consume(String message, Acknowledgment acknowledgment) throws IOException {
try {
logger.info(String.format("#### -> Consumed message -> %s", message));
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
// 手动提交 offset
acknowledgment.acknowledge();
}
}
示例二
import com.common.Bar2;
import com.common.Foo2;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/** * 这里定义了消费者组,和多个 topic 的对应 * * @author Gary Russell * @since 5.1 */
@Component
@KafkaListener(id = "multiGroup", topics = {
"foos", "bars", "test"})
public class MultiMethods {
private final Logger logger = LoggerFactory.getLogger(MultiMethods.class);
@KafkaHandler
public void foo(Foo2 foo) {
System.out.println("Received: " + foo);
}
@KafkaHandler
public void bar(Bar2 bar) {
System.out.println("Received: " + bar);
}
@KafkaHandler(isDefault = true)
public void unknown(Object object) {
System.out.println("Received unknown: " + object);
}
@KafkaHandler
public void tests(ConsumerRecord record) {
logger.info("Received -> key :{} ", record.key());
}
}