进入mysql,输入
show variables like 'bin_log'
查看是否开启binlog如果没有,则在
/etc/my.cnf
文件中添加如下内容[mysqld] server-id= 1 log-bin= mysql-bin binlog_format= row
重启mysql,
sudo service mysql restart
输入
show variables like 'bin_log'
查看是否开启binlog创建一个文件夹存放canal,
mkdir /opt/module/canal
创建一个Maven项目,并且添加Scala依赖,pom文件如下
<dependencies> <!--canal 客户端, 从 canal 服务器读取数据--> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <!-- kafka 客户端 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency> </dependencies>
创建两个单例类
CanalHandler
和CanalClient
,代码如下import java.util import com.alibaba.otter.canal.protocol.CanalEntry import com.alibaba.otter.canal.protocol.CanalEntry.{EventType, RowData} object CanalHandler { /** * 处理从 canal 取来的数据 * * @param tableName 表名 * @param eventType 事件类型 * @param rowDataList 数据类别 */ def handle(tableName: String, eventType: EventType, rowDataList: util.List[RowData]) = { import scala.collection.JavaConversions._ if ("order_info" == tableName && eventType == EventType.INSERT && rowDataList.size() > 0) { // 1. rowData 表示一行数据, 通过他得到每一列. 首先遍历每一行数据 for (rowData <- rowDataList) { // 2. 得到每行中, 所有列组成的列表 val columnList: util.List[CanalEntry.Column] = rowData.getAfterColumnsList for (column <- columnList) { // 3. 得到列名和列值 println(column.getName + ":" + column.getValue) } } } } }
import java.net.InetSocketAddress import com.alibaba.otter.canal.client.{CanalConnector, CanalConnectors} import com.alibaba.otter.canal.protocol.CanalEntry.{EntryType, RowChange} import com.alibaba.otter.canal.protocol.{CanalEntry, Message} import com.google.protobuf.ByteString object CanalClient { def main(args: Array[String]): Unit = { // 1. 创建能连接到 Canal 的连接器对象 val connector: CanalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop101", 11111), "example", "", "") // 2. 连接到 Canal connector.connect() // 3. 监控指定的表的数据的变化 connector.subscribe("gmall.order_info") while (true) { // 4. 获取消息 (一个消息对应 多条sql 语句的执行) val msg: Message = connector.get(100) // 一次最多获取 100 条 sql // 5. 个消息对应多行数据发生了变化, 一个 entry 表示一条 sql 语句的执行 val entries: java.util.List[CanalEntry.Entry] = msg.getEntries import scala.collection.JavaConversions._ if (entries.size() > 0) { // 6. 遍历每行数据 for (entry <- entries) { // 7. EntryType.ROWDATA 只对这样的 EntryType 做处理 if (entry.getEntryType == EntryType.ROWDATA) { // 8. 获取到这行数据, 但是这种数据不是字符串, 所以要解析 val value: ByteString = entry.getStoreValue val rowChange: RowChange = RowChange.parseFrom(value) // 9.定义专门处理的工具类: 参数 1 表名, 参数 2 事件类型(插入, 删除等), 参数 3: 具体的数据 CanalHandler.handle(entry.getHeader.getTableName, rowChange.getEventType, rowChange.getRowDatasList) } } } else { println("没有抓取到数据...., 2s 之后重新抓取") Thread.sleep(2000) } } } }
向
gmall.order_info
中插入数据,可以看到控制台打印出日志id:1 consignee:UHBKVw consignee_tel:13492323624 total_amount:530.0 order_status:1 user_id:26 payment_way:1 delivery_address:jEsdlXeEICMeiFCVWXUI order_comment:JhSSLZQfcVvquYLHeyff out_trade_no:2866414735 trade_body: create_time:2021-05-09 19:30:09 operate_time: expire_time: tracking_no: parent_order_id: img_url: id:2 consignee:QJWJqK consignee_tel:13585089108 total_amount:249.0 order_status:2 user_id:73 payment_way:1 delivery_address:QBkurDQvEIFZgKgyaIov order_comment:nanqMPOyaJsMFQpdtJor out_trade_no:8132263655 trade_body: create_time:2021-05-09 05:47:49 operate_time:2021-05-09 06:16:19 expire_time: tracking_no: parent_order_id: img_url: 没有抓取到数据...., 2s 之后重新抓取 没有抓取到数据...., 2s 之后重新抓取 没有抓取到数据...., 2s 之后重新抓取
配置好Kafka之后即可将数据生产到Kafka中