基本介绍
Kafka是由scala语言开发的一个分布式,支持分区以及多副本的分布式消息系统;可以实时的处理大量数据以满足各种需求场景。其中分布式存储也是Kafka和RabbitMQ最大的区别点。
常用术语介绍
术语 | 描述 |
---|---|
Broker | Kafka消息服务节点,是组成Kafka集群的最小单位 |
Topic | 用于归类消息,每条消息都会对应一个Topic |
Producer | 消息生产者,向Broker发送消息 |
Consumer | 消息消费者,从Broker拉取消息 |
ConsumerGroup | 消费者所属组,一条消息可以被多个消费组消息,但是该消息只能被消费组中某个消费者消费 |
Partition | 分区,一个topic可以分为多个分区,分区内的消息是有序的 |
参考下图理解相关术语:
快速开始
下载安装
wget https://mirrors.bfsu.edu.cn/apache/kafka/2.6.2/kafka_2.12-2.6.2.tgz tar -zxf kafka_2.12-2.6.2.tgz cd kafka_2.12-2.6.2
配置修改
# 编辑配置 vi config/server.properties # 配置说明: # broker.id在kafka集群中必须唯一 broker.id=0 # 部署kafka机器的ip:port listeners=PLAINTEXT://192.168.1.161:9092 # 存储消息文件路径 log.dirs=/usr/local/data/kafka-logs # zk连接地址 zookeeper.connect=localhost:2181 # topic默认分区数 num.partitions
启停
# 启动前查看ZK节点信息,只有zookeeper一个节点 [zk: localhost:2181(CONNECTED) 0] ls / [zookeeper] # 后台启动 bin/kafka-server-start.sh -daemon config/server.properties # 启动后查看ZK节点信息,多出了很多kafka写入的元信息 [zk: localhost:2181(CONNECTED) 1] ls / [admin, brokers, cluster, config, consumers, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper] # broker.id会写到zk这里/brokers/ids [zk: localhost:2181(CONNECTED) 2] ls /brokers/ids [0] # 关闭 bin/kafka-server-stop.sh
常用命令
# 创建topic, replication-factor: 指定副本数, partitions: 指定分区数 bin/kafka-topics.sh --create --zookeeper 192.168.1.161:2181 --replication-factor 1 --partitions 1 --topic hello # 查看topic bin/kafka-topics.sh --list --zookeeper 192.168.1.161:2181 # 删除topic bin/kafka-topics.sh --delete --topic hello --zookeeper 192.168.1.161:2181 # 发送消息 bin/kafka-console-producer.sh --broker-list 192.168.1.161:9092 --topic hello # 消费最新的消息 bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.161:9092 --topic hello # 消费所有消息 bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.161:9092 --from-beginning --topic hello # 消费多个topic bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.161:9092 --whitelist "hello|hello1" # 单播消费(将多个消费者设置相同的消费组)和多播消费(将多个消费者设置不同的消费组) bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.161:9092 --consumer-property group.id=helloGroup --topic hello # 查看消费组 bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.161:9092 --list # 查看消费组偏移量 bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.161:9092 --describe --group helloGroup
消费组
消费组中的消费者在进行消息消费的时候,是根据消费组的偏移量进行消费了,即这些消费者共享消费组的偏移量。当消费组中没有消费者时,生产者发送的新消息会在消费者启动后被消费。
偏移量说明
GROUP:组名
TOPIC:topic名称
PARTITION:分区号
CURRENT-OFFSET:消费组已消费消息文件的偏移量
LOG-END-OFFSET:消息文件末尾偏移量(HW)
LAG:消费组未消费的消息数
# 消费者组中没有消费者时,生产了两条新消息(123和456) bin/kafka-console-producer.sh --broker-list 192.168.1.161:9092 --topic hello >123 >456 # 再次查看消费组信息,发现未消费的消息数变为8了 bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.161:9092 --describe --group helloGroup GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID helloGroup hello 0 6 8 2 - - - # 启动消费者,消费了两条消息 bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.161:9092 --consumer-property group.id=helloGroup --topic hello 123 456 # 再次查看消费组信息 bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.161:9092 --describe --group helloGroup GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID helloGroup hello 0 8 8 0 consumer-helloGroup-1-2a9923b8-1ca0-48d0-8b59-0c4aeff3cd52 /192.168.1.161 consumer-helloGroup-1
分区简介
多分区主题
# 创建, 指定partitions数量为2 bin/kafka-topics.sh --create --zookeeper 192.168.1.161:2181 --replication-factor 1 --partitions 2 --topic multi-hello # 查看topic信息 bin/kafka-topics.sh --describe --zookeeper 192.168.1.161:2181 --topic multi-hello Topic: multi-hello PartitionCount: 2 ReplicationFactor: 1 Configs: Topic: multi-hello Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: multi-hello Partition: 1 Leader: 0 Replicas: 0 Isr: 0
同时去到/usr/local/data/kafka-logs
目录下可以看到有两个数据文件目录(multi-hello-0
和multi-hello-1
)。
# 扩容, 更新partitions数量为3 bin/kafka-topics.sh -alter --zookeeper 192.168.1.161:2181 --partitions 3 --topic multi-hello Adding partitions succeeded! bin/kafka-topics.sh --describe --zookeeper 192.168.1.161:2181 --topic multi-hello Topic: multi-hello PartitionCount: 3 ReplicationFactor: 1 Configs: Topic: multi-hello Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: multi-hello Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: multi-hello Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Why分区?
若将海量数据存储在一台机器上必定会有容量限制问题,对Topic进行分区,可以将数据分布存储在不同机器上,同时也提高了数据处理的并行度。
Kafka集群简单搭建
复制配置文件
mkdir cluster/ cp config/server.properties cluster/server-1.properties cp config/server.properties cluster/server-2.properties cp config/server.properties cluster/server-3.properties
修改配置文件
# broker1 broker.id=1 listeners=PLAINTEXT://192.168.1.161:9092 log.dirs=/usr/local/data/kafka-logs-1 # 同一集群中多个Broker实例的zk连接地址必须相同 zookeeper.connect=localhost:2181 # broker2 broker.id=2 listeners=PLAINTEXT://192.168.1.161:9093 log.dirs=/usr/local/data/kafka-logs-2 # 同一集群中多个Broker实例的zk连接地址必须相同 zookeeper.connect=localhost:2181 # broker3 broker.id=3 listeners=PLAINTEXT://192.168.1.161:9094 log.dirs=/usr/local/data/kafka-logs-3 # 同一集群中多个Broker实例的zk连接地址必须相同 zookeeper.connect=localhost:2181
启动
bin/kafka-server-start.sh -daemon cluster/server-1.properties bin/kafka-server-start.sh -daemon cluster/server-2.properties bin/kafka-server-start.sh -daemon cluster/server-3.properties # 通过查看zk中的节点信息确认启动成功 [zk: localhost:2181(CONNECTED) 4] ls /brokers/ids [1, 2, 3]
集群中的分区
基于上一步的简单集群再来深入理解一下分区
多副本多分区的topic
bin/kafka-topics.sh --create --zookeeper 192.168.1.161:2181 --replication-factor 3 --partitions 2 --topic multi-replicate-partition-topic bin/kafka-topics.sh --describe --zookeeper 192.168.1.161:2181 --topic multi-replicate-partition-topic Topic: multi-replicate-partition-topic PartitionCount: 2 ReplicationFactor: 3 Configs: Topic: multi-replicate-partition-topic Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: multi-replicate-partition-topic Partition: 1 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
命令输出说明:
Leader:负责读写请求的broker id
Replicas:对应partition存储备份的broker.id
Isr: 是Replicas子集,只会列出存活且备份已同步的broker.id。leader也是从Isr列表产生。
停掉broker.id=2的broker
ps -ef|grep cluster/server-2.properties 501 85803 1 0 3:43PM ttys003 0:27.53 /L... kill 85803 bin/kafka-topics.sh --describe --zookeeper 192.168.1.161:2181 --topic multi-replicate-partition-topic Topic: multi-replicate-partition-topic PartitionCount: 2 ReplicationFactor: 3 Configs: Topic: multi-replicate-partition-topic Partition: 0 Leader: 3 Replicas: 2,3,1 Isr: 3,1 Topic: multi-replicate-partition-topic Partition: 1 Leader: 3 Replicas: 3,1,2 Isr: 3,1
命令输出说明:
Isr只剩3和1了,Replicas没变
生产者与分区
生产者可通过轮询或者关键字的方式将消息发送到指定的partition中
消费者与分区
一个分区中的消息只能被同一个消费组中某一个消费者消费。
顺序消费
Kafka只能保证在同一个partition内顺序消费,不能保证同一个topic下的多partition消费有序。
Java客户端访问
引入依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.6.2</version> </dependency>
生产者Demo
public class Producer { private final static String TOPIC_NAME = "multi-replicate-partition-topic"; public static void main(String[] args) throws Exception { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.161:9092,192.168.1.161:9093,192.168.1.161:9094"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); // 指定分区方式 int partition = 0; ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, partition, "EXAMPLE", "Hello Kafka"); // 同步发送 RecordMetadata recordMetadata = producer.send(producerRecord).get(); System.out.println("topic: " + recordMetadata.topic() + ", partition:" + recordMetadata.partition() + ", offset:" + recordMetadata.offset()); // 未指定分区方式时,分区的计算方式: hash(key)%partitionNum producerRecord = new ProducerRecord<>(TOPIC_NAME, "EXAMPLE", "Hello Kafka"); CountDownLatch countDownLatch = new CountDownLatch(1); // 异步回调方式发送 producer.send(producerRecord, (metadata, exception) -> { System.out.println("topic: " + metadata.topic() + ", partition:" + metadata.partition() + ", offset:" + metadata.offset()); countDownLatch.countDown(); }); countDownLatch.await(); producer.close(); } }
消费者Demo
public class Consumer { private final static String TOPIC_NAME = "multi-replicate-partition-topic"; public static void main(String[] args) { Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.161:9092,192.168.1.161:9093,192.168.1.161:9094"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList(TOPIC_NAME)); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } }