背景
在业务系统中,会涉及到多个数据源的数据流转,例如在线系统的数据流转到分析系统、流计算系统、搜索引擎、缓存系统、事件处理系统等。
如图所示:
Debezium
官网地址:https://debezium.io/
Debezium是一个开源项目,为捕获数据更改(Capture Data Change,CDC)提供了一个低延迟的流式处理平台,通过安装配置Debezium监控数据库,可以实时消费行级别(row-level)的更改。身为一个分布式系统,Debezium也拥有良好的容错性。
Debezium的源端(即支持监控哪些数据库) : MySQL,MongoDB,PostgreSQL,Oracle,SQL Server
Debezium的目标端(即可以数据导入端) : Kafka
Debezium的应用 : 实时同步数据,实时消费数据
Pulsar IO
Pulsar与Debezium对接,Pulsar IO 框架的内置 connector (https://pulsar.apache.org/docs/en/io-connectors),使实时同步,消费数据更加简单。
入门
启动MySQL
启动MySQL,并开启MySQL的binlog功能。Windows版本需配置my.ini文件(Linux是my.cnf文件),添加以下配置:
[mysqld] log-bin=mysql-bin #添加这一行就ok binlog-format=ROW #选择row模式 server_id=1
启动Pulsar standalone
下载Pulsar(目前是2.4.0版本):
$ wget https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-2.4.0/apache-pulsar-2.4.0-bin.tar.gz
解压:
$ tar -zxvf apache-pulsar-2.4.0-bin.tar.gz`
启动Pulsar standalone:
$ cd apache-pulsar-2.4.0 $ bin/pulsar standalone
Debezium connector
创建 connectors文件夹
$ mkdir connectors
下载pulsar-io-debezium-mysql-2.4.0.nar
$ cd connectors $ wget https://mirrors.tuna.tsinghua.edu.cn/apache/pulsar/pulsar-2.4.0/connectors/pulsar-io-debezium-mysql-2.4.0.nar
vim debezium-mysql-source-config.yaml
tenant: "public" namespace: "default" name: "debezium-mysql-source" topicName: "debezium-mysql-topic" archive: "connectors/pulsar-io-debezium-mysql-2.4.0.nar" parallelism: 1 configs: ## config for mysql, docker image: debezium/example-mysql:0.8 database.hostname: "MySQL的ip地址" database.port: "MySQL端口号" database.user: "username" database.password: "password" database.server.id: "184054" #连接器的标识符,在数据库集群中必须是唯一的,类似于Database的server-id配置属性。 database.server.name: "dbserver1" #数据库服务器/集群的逻辑名称,用于连接器写入的Kafka主题的所有名称 database.serverTimezone: "UTC" #时区,必须加 database.whitelist: "inventory" #用于匹配要监视的数据库名称 database.blacklist : "" #匹配要从监视中排除的数据库名称 table.whitelist: "" #匹配要监视的表,每个标识符的格式为databaseName.tableName database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory" database.history.pulsar.topic: "history-topic" database.history.pulsar.service.url: "pulsar://127.0.0.1:6650" ## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG key.converter: "org.apache.kafka.connect.json.JsonConverter" value.converter: "org.apache.kafka.connect.json.JsonConverter" ## PULSAR_SERVICE_URL_CONFIG pulsar.service.url: "pulsar://127.0.0.1:6650" ## OFFSET_STORAGE_TOPIC_CONFIG offset.storage.topic: "offset-topic"
启动Debezium连接
$ bin/pulsar-admin source localrun --source-config-file connectors/debezium-mysql-source-config.yaml
订阅Pulsar topic,监听MySQL数据变化
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://ip地址:6650").build(); //Pulsar有三种订阅主题形式: topic,topics, pattern topic,这里采取topics方式 List<String> topics = new ArrayList<>(); topics.add("dbserver1.数据库名.表名"); ConsumerBuilder consumerBuilder = pulsarClient.newConsumer(Schema.KeyValue(Schema.BYTES, Schema.BYTES)); Consumer consumer = consumerBuilder .topics(topics).subscriptionName("sub-products") .subscribe(); while(true) { Message msg = consumer.receive(); KeyValue<byte[], byte[]> keyValues = Schema.KeyValue(Schema.BYTES, Schema.BYTES).decode(msg.getData()); JSONSchema<Map> jsonSchema = JSONSchema.of(SchemaDefinition.<Map>builder().withPojo(Map.class).build()); Map keyResult = jsonSchema.decode(keyValues.getKey()); Map valueResult = jsonSchema.decode(keyValues.getValue()); //TODO 解析binlog日志 consumer.acknowledge(msg); }
这里讲一下客户端消费的binlog日志的payload字段:
"payload": { "before": null, "after": { "id": 1, "userName": "userName", "passWord": "123456", "user_sex": "1", "nick_name": "昵称" }, "source": { "version": "0.9.2.Final", "connector": "mysql", "name": "dbserver1", "server_id": 0, "ts_sec": 0, "gtid": null, "file": "mysql-bin.000002", "pos": 55725, "row": 0, "snapshot": true, "thread": null, "db": "test", "table": "users", "query": null }, "op": "c", "ts_ms": 1563096917451 }
op是表示MySQL数据的操作:c创建(或插入),u更新,d删除和r读取
before,after:分别表示操作前的数据以及操作后的数据
source:事件源元数据的结构
修改MySQL数据
mysql> use inventory; mysql> show tables; mysql> SELECT * FROM products ; mysql> UPDATE products SET name='1111111111' WHERE id=101; mysql> UPDATE products SET name='1111111111' WHERE id=107;
参考
https://pulsar.apache.org/docs/en/io-cdc-debezium/
https://debezium.io/docs/connectors/mysql/