基本介绍

Kafka是由scala语言开发的一个分布式,支持分区以及多副本的分布式消息系统;可以实时的处理大量数据以满足各种需求场景。其中分布式存储也是Kafka和RabbitMQ最大的区别点。

常用术语介绍

术语 描述
Broker Kafka消息服务节点,是组成Kafka集群的最小单位
Topic 用于归类消息,每条消息都会对应一个Topic
Producer 消息生产者,向Broker发送消息
Consumer 消息消费者,从Broker拉取消息
ConsumerGroup 消费者所属组,一条消息可以被多个消费组消息,但是该消息只能被消费组中某个消费者消费
Partition 分区,一个topic可以分为多个分区,分区内的消息是有序的

参考下图理解相关术语:
Kafka

快速开始

下载安装

wget https://mirrors.bfsu.edu.cn/apache/kafka/2.6.2/kafka_2.12-2.6.2.tgz
tar -zxf kafka_2.12-2.6.2.tgz
cd kafka_2.12-2.6.2

配置修改

# 编辑配置
vi config/server.properties
# 配置说明:
# broker.id在kafka集群中必须唯一
broker.id=0
# 部署kafka机器的ip:port
listeners=PLAINTEXT://192.168.1.161:9092
# 存储消息文件路径
log.dirs=/usr/local/data/kafka-logs
# zk连接地址
zookeeper.connect=localhost:2181
# topic默认分区数
num.partitions

启停

# 启动前查看ZK节点信息,只有zookeeper一个节点
[zk: localhost:2181(CONNECTED) 0] ls /
[zookeeper]
# 后台启动
bin/kafka-server-start.sh -daemon config/server.properties
# 启动后查看ZK节点信息,多出了很多kafka写入的元信息
[zk: localhost:2181(CONNECTED) 1] ls /
[admin, brokers, cluster, config, consumers, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
# broker.id会写到zk这里/brokers/ids
[zk: localhost:2181(CONNECTED) 2] ls /brokers/ids
[0]
# 关闭
bin/kafka-server-stop.sh

常用命令

# 创建topic, replication-factor: 指定副本数, partitions: 指定分区数 
bin/kafka-topics.sh --create --zookeeper 192.168.1.161:2181 --replication-factor 1 --partitions 1 --topic hello
# 查看topic
bin/kafka-topics.sh --list --zookeeper 192.168.1.161:2181
# 删除topic
bin/kafka-topics.sh --delete --topic hello --zookeeper 192.168.1.161:2181
# 发送消息
bin/kafka-console-producer.sh --broker-list 192.168.1.161:9092 --topic hello
# 消费最新的消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.161:9092 --topic hello
# 消费所有消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.161:9092 --from-beginning --topic hello
# 消费多个topic 
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.161:9092 --whitelist "hello|hello1"
# 单播消费(将多个消费者设置相同的消费组)和多播消费(将多个消费者设置不同的消费组)
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.161:9092 --consumer-property group.id=helloGroup --topic hello
# 查看消费组
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.161:9092 --list
# 查看消费组偏移量
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.161:9092 --describe --group helloGroup

消费组

消费组中的消费者在进行消息消费的时候,是根据消费组的偏移量进行消费了,即这些消费者共享消费组的偏移量。当消费组中没有消费者时,生产者发送的新消息会在消费者启动后被消费。

偏移量说明

GROUP:组名
TOPIC:topic名称
PARTITION:分区号
CURRENT-OFFSET:消费组已消费消息文件的偏移量
LOG-END-OFFSET:消息文件末尾偏移量(HW)
LAG:消费组未消费的消息数

# 消费者组中没有消费者时,生产了两条新消息(123和456)
bin/kafka-console-producer.sh --broker-list 192.168.1.161:9092 --topic hello
>123
>456
# 再次查看消费组信息,发现未消费的消息数变为8了
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.161:9092 --describe --group helloGroup
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
helloGroup      hello           0          6               8               2               -               -               -
# 启动消费者,消费了两条消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.161:9092 --consumer-property group.id=helloGroup --topic hello
123
456
# 再次查看消费组信息
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.161:9092 --describe --group helloGroup
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                HOST            CLIENT-ID
helloGroup      hello           0          8               8               0               consumer-helloGroup-1-2a9923b8-1ca0-48d0-8b59-0c4aeff3cd52 /192.168.1.161  consumer-helloGroup-1

分区简介

多分区主题

# 创建, 指定partitions数量为2
bin/kafka-topics.sh --create --zookeeper 192.168.1.161:2181 --replication-factor 1 --partitions 2 --topic multi-hello
# 查看topic信息
bin/kafka-topics.sh --describe --zookeeper 192.168.1.161:2181 --topic multi-hello
Topic: multi-hello    PartitionCount: 2    ReplicationFactor: 1    Configs:
    Topic: multi-hello    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
    Topic: multi-hello    Partition: 1    Leader: 0    Replicas: 0    Isr: 0

同时去到/usr/local/data/kafka-logs目录下可以看到有两个数据文件目录(multi-hello-0multi-hello-1)。
partition

# 扩容, 更新partitions数量为3
bin/kafka-topics.sh -alter --zookeeper 192.168.1.161:2181 --partitions 3 --topic multi-hello
Adding partitions succeeded!
bin/kafka-topics.sh --describe --zookeeper 192.168.1.161:2181 --topic multi-hello
Topic: multi-hello    PartitionCount: 3    ReplicationFactor: 1    Configs:
    Topic: multi-hello    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
    Topic: multi-hello    Partition: 1    Leader: 0    Replicas: 0    Isr: 0
    Topic: multi-hello    Partition: 2    Leader: 0    Replicas: 0    Isr: 0

Why分区?

若将海量数据存储在一台机器上必定会有容量限制问题,对Topic进行分区,可以将数据分布存储在不同机器上,同时也提高了数据处理的并行度。

Kafka集群简单搭建

复制配置文件

mkdir cluster/
cp config/server.properties cluster/server-1.properties
cp config/server.properties cluster/server-2.properties
cp config/server.properties cluster/server-3.properties

修改配置文件

# broker1
broker.id=1
listeners=PLAINTEXT://192.168.1.161:9092
log.dirs=/usr/local/data/kafka-logs-1
# 同一集群中多个Broker实例的zk连接地址必须相同
zookeeper.connect=localhost:2181
# broker2
broker.id=2
listeners=PLAINTEXT://192.168.1.161:9093
log.dirs=/usr/local/data/kafka-logs-2
# 同一集群中多个Broker实例的zk连接地址必须相同
zookeeper.connect=localhost:2181
# broker3
broker.id=3
listeners=PLAINTEXT://192.168.1.161:9094
log.dirs=/usr/local/data/kafka-logs-3
# 同一集群中多个Broker实例的zk连接地址必须相同
zookeeper.connect=localhost:2181

启动

bin/kafka-server-start.sh -daemon cluster/server-1.properties
bin/kafka-server-start.sh -daemon cluster/server-2.properties
bin/kafka-server-start.sh -daemon cluster/server-3.properties

# 通过查看zk中的节点信息确认启动成功
[zk: localhost:2181(CONNECTED) 4] ls /brokers/ids
[1, 2, 3]

集群中的分区

基于上一步的简单集群再来深入理解一下分区

多副本多分区的topic

bin/kafka-topics.sh --create --zookeeper 192.168.1.161:2181 --replication-factor 3 --partitions 2 --topic multi-replicate-partition-topic
bin/kafka-topics.sh --describe --zookeeper 192.168.1.161:2181 --topic multi-replicate-partition-topic
Topic: multi-replicate-partition-topic    PartitionCount: 2    ReplicationFactor: 3    Configs:
    Topic: multi-replicate-partition-topic    Partition: 0    Leader: 2    Replicas: 2,3,1    Isr: 2,3,1
    Topic: multi-replicate-partition-topic    Partition: 1    Leader: 3    Replicas: 3,1,2    Isr: 3,1,2

命令输出说明:
Leader:负责读写请求的broker id
Replicas:对应partition存储备份的broker.id
Isr: 是Replicas子集,只会列出存活且备份已同步的broker.id。leader也是从Isr列表产生。

停掉broker.id=2的broker

ps -ef|grep cluster/server-2.properties
501 85803     1   0  3:43PM ttys003    0:27.53 /L...
kill 85803
bin/kafka-topics.sh --describe --zookeeper 192.168.1.161:2181 --topic multi-replicate-partition-topic
Topic: multi-replicate-partition-topic    PartitionCount: 2    ReplicationFactor: 3    Configs:
    Topic: multi-replicate-partition-topic    Partition: 0    Leader: 3    Replicas: 2,3,1    Isr: 3,1
    Topic: multi-replicate-partition-topic    Partition: 1    Leader: 3    Replicas: 3,1,2    Isr: 3,1

命令输出说明:
Isr只剩3和1了,Replicas没变

生产者与分区

生产者可通过轮询或者关键字的方式将消息发送到指定的partition中

消费者与分区

一个分区中的消息只能被同一个消费组中某一个消费者消费。

顺序消费

Kafka只能保证在同一个partition内顺序消费,不能保证同一个topic下的多partition消费有序。

Java客户端访问

引入依赖

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.6.2</version>
</dependency>

生产者Demo

public class Producer {
    private final static String TOPIC_NAME = "multi-replicate-partition-topic";

    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.161:9092,192.168.1.161:9093,192.168.1.161:9094");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        // 指定分区方式
        int partition = 0;
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME,
            partition, "EXAMPLE", "Hello Kafka");
        // 同步发送
        RecordMetadata recordMetadata = producer.send(producerRecord).get();
        System.out.println("topic: " + recordMetadata.topic() + ", partition:" + recordMetadata.partition() + ", offset:" + recordMetadata.offset());

        // 未指定分区方式时,分区的计算方式: hash(key)%partitionNum
        producerRecord = new ProducerRecord<>(TOPIC_NAME, "EXAMPLE", "Hello Kafka");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        // 异步回调方式发送
        producer.send(producerRecord, (metadata, exception) -> {
            System.out.println("topic: " + metadata.topic() + ", partition:" + metadata.partition() + ", offset:" + metadata.offset());
            countDownLatch.countDown();
        });
        countDownLatch.await();
        producer.close();
    }
}

消费者Demo

public class Consumer {
    private final static String TOPIC_NAME = "multi-replicate-partition-topic";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.161:9092,192.168.1.161:9093,192.168.1.161:9094");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}