业务场景介绍
场景描述
当前线上购物无疑是最火热的购物方式,而电商平台则又可以以多种方式接入,例如通过web方式访问、通过app的方式访问、通过微信小程序的方式访问等等。而电商平台则需要每天统计各平台的实时访问数据量、订单数、访问人数等等指标,从而能在显示大屏上实时展示相关数据,方便及时了解数据变化,有针对性地调整营销策略。而如何高效快捷地统计这些指标呢?假设平台已经将每个商品的订单信息实时写入Kafka中,这些信息包括订单ID、订单生成的渠道(即web方式、app方式等)、订单时间、订单金额、折扣后实际支付金额、支付时间、用户ID、用户姓名、订单地区ID等信息。而我们需要做的,就是根据当前可以获取到的业务数据,实时统计每种渠道的相关指标,输出存储到数据库中,并进行大屏展示。
场景方案
场景任务
使用DLI Flink完成电商业务实时数据的分析处理,获取各个渠道的销售汇总数据。
数据说明
数据源表:电商业务订单详情宽表
结果表:各渠道的销售总额实时统计表
操作过程
创建资源
在账户下创建作业需要的相关资源,涉及VPC、DMS、DLI、RDS。
获取DMS连接地址并创建Topic
获取DMS Kafka实例连接地址并创建DMSTopic。
创建RDS数据库表
获取RDS实例内网地址,登录RDS实例创建RDS数据库及MySQL表。
创建DLI增强型跨源
创建DLI增强型跨源,并测试队列与RDS、DMS实例连通性。
创建并提交Flink作业
创建DLI Flink OpenSource SQL作业并运行。
查询结果
查询Flink作业结果,使用DLV进行大屏展示。
实操过程
获取 DMS 连接地址并创建 Topic
在控制台单击“服务列表”,选择“分布式消息服务DMS”,单击进入DMS服务
控制台页面
进入实例详情页面。单击“基本信息”,获取“连接地址”
单击“Topic管理”,创建一个Topic:trade_order_detail_info。
创建 RDS 数据库表
在控制台单击“服务列表”,选择“云数据库RDS”,单击进入RDS页面。在“实例管理页面”,找到已经创建的RDS实例,获取其内网地址。
单击所创建RDS实例的“登录”,跳转至“数据管理服务-DAS”。输入相关账户信息,单击“测试连接”。显示连接成功后,单击“登录”,进入“实例登录”页面。
登录RDS实例后,单击“新建数据库”,创建名称为“dli-demo”的数据库。
单击“SQL操作”>“SQL查询”,执行如下SQL创建测试用MySQL表
DROP TABLE `dli-demo`.`trade_channel_collect`;
CREATE TABLE `dli-demo`.`trade_channel_collect` (
`begin_time` VARCHAR(32) NOT NULL,
`channel_code` VARCHAR(32) NOT NULL,
`channel_name` VARCHAR(32) NULL,
`cur_gmv` DOUBLE UNSIGNED NULL,
`cur_order_user_count` BIGINT UNSIGNED NULL,
`cur_order_count` BIGINT UNSIGNED NULL,
`last_pay_time` VARCHAR(32) NULL,
˺ē²nÀȎ”ñààžnìȎì²mž˺ VARCHAR(32) NULL,
PRIMARY KEY (`begin_time`, `channel_code`)
) ENGINE = InnoDB
DEFAULT CHARACTER SET = utf8mb4
COLLATE = utf8mb4_general_ci
COMMENT = '各渠道的销售总额实时统计';
创建 DLI 增强型跨源
在控制台单击“服务列表”,选择“数据湖探索”,单击进入DLI服务页面。单击“队列管理”,在队列列表中您所创建的通用队列
单击“全局配置”>“服务授权”,选中“VPC Administrator”,单击“更新委托权限”,赋予DLI操作用户VPC资源的权限,用于创建VPC的“对等连接”
单击“跨源连接”>“增强型跨源”>“创建”
配置如下:
绑定队列:选择您所创建的通用队列。
虚拟私有云:选择 Kafka 与 MySQL 实例所在的VPC。
子网:选择 Kafka 与 MySQL 实例所在的子网。
创建增强型跨源
增强型跨源创建完成后,在跨源列表中,对应的跨源连接状态会显示为“已激活”。
单击跨源连接的名称,详情页面显示连接状态为“ACTIVE”。
测试队列与RDS、DMS实例连通性
单击“队列管理”,选择您所使用的队列,单击“操作”列中的“更多”>“测试地址连通性”。
输入前序步骤3-2获取的DMS Kafka实例连接地址和步骤4-2获取的RDS
MySQL实例内网地址,进行网络连通性测试。
测试结果显示可达,则DLI队列与Kafka、MySQL实例的网络已经联通。
如果测试结果不可达,需要修改实例所在VPC的安全组规则,放开9092、3306端口对DLI队列的限制,DLI队列网段信息可以在队列的详情页中获取。
创建并提交 Flink 作业
单击DLI控制台左侧“作业管理”,选择“Flink作业”。单击“创建作业”。
类型:选择作业类型为:Flink OpenSource SQL。
名称:自定义。
单击“确定”,进入作业编辑作业页面,具体SQL示例如下,部分参数值需要根据RDS和DMS对应的信息进行修改。
--********************************************************************--
-- 数据源:trade_order_detail_info (订单详情宽表)
--********************************************************************--
create table trade_order_detail (
order_id string, -- 订单ID
order_channel string, -- 渠道
order_time string, -- 订单创建时间
pay_amount double, -- 订单金额
real_pay double, -- 实际付费金额
pay_time string, -- 付费时间
user_id string, -- 用户ID
user_name string, -- 用户名
area_id string -- 地区ID
) with (
"connector.type" = "kafka",
"connector.version" = "0.10",
"connector.properties.bootstrap.servers" = "xxxx:9092,xxxx:9092,xxxx:9092", -- Kafka连接地址
"connector.properties.group.id" = "trade_order", -- Kafka groupID
"connector.topic" = "trade_order_detail_info", -- Kafka topic
"format.type" = "json",
"connector.startup-mode" = ȊÆìžäìȝÑčäžìȊ
);
--********************************************************************--
-- 结果表:trade_channel_collect (各渠道的销售总额实时统计)
--********************************************************************--
create table trade_channel_collect(
begin_time string, --统计数据的开始时间
channel_code string, -- 渠道编号
channel_name string, -- 渠道名
cur_gmv double, -- 当天GMV
cur_order_user_count bigint, -- 当天付款人数
cur_order_count bigint, -- 当天付款订单数
last_pay_time string, -- 最近结算时间
ē²nÀȎ”ñààžnìȎì²mž string,
primary key (begin_time, channel_code) not enforced
) with (
"connector.type" = "jdbc",
"connector.url" = "jdbc:mysql://xxxx:3306/xxxx", -- mysql连接地址,jdbc格式
"connector.table" = "xxxx", -- mysql表名
"connector.driver" = "com.mysql.jdbc.Driver",
"connector.username" = "xxx", -- mysql用户名
"connector.password" = "xxxx", -- mysql密码
Ȋ”Ñnnž”ìÑàȇwà²ìžȇēñä¯ȇm†ĂȝàÑwäȊ = "1000",
Ȋ”Ñnnž”ìÑàȇwà²ìžȇēñä¯ȇ²nìžàv†ÃȊ = "1s"
);
--********************************************************************--
-- 临时中间表
--********************************************************************--
create view tmp_order_detail
as
select *
, case when t.order_channel not in ("webShop", "appShop", "miniAppShop") then "other"
else t.order_channel end as channel_code --重新定义统计渠道 只有四个枚举值[webShop、
appShop、miniAppShop、other]
, case when t.order_channel = "webShop" then _UTF16"网页商城"
when t.order_channel = "appShop" then _UTF16"app商城"
when t.order_channel = "miniAppShop" then _UTF16"小程序商城"
else _UTF16"其他" end as channel_name --渠道名称
from (
select *
, row_number() over(partition by order_id order by order_time desc ) as rn --去除重复订单数据
, concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") as begin_time
, concat(substr("2021-03-25 12:03:00", 1, 10), " 23:59:59") as end_time
from trade_order_detail
where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") --取今天数据,为了方
便运行,这里使用"2021-03-25 12:03:00"替代cast(LOCALTIMESTAMP as string)
and real_pay is not null
) t
where t.rn = 1;
-- 按渠道统计各个指标
insert into trade_channel_collect
select
begin_time --统计数据的开始时间
, channel_code
, channel_name
, cast(COALESCE(sum(real_pay), 0) as double) as cur_gmv --当天GMV
, count(distinct user_id) as cur_order_user_count --当天付款人数
, count(1) as cur_order_count --当天付款订单数
, max(pay_time) as last_pay_time --最近结算时间
, cast(LOCALTIMESTAMP as string) as ē²nÀȎ”ñààžnìȎì²mž ȝȝē²nÀ任务中的当前时间
from tmp_order_detail
where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00")
group by begin_time, channel_code, channel_name;
作业逻辑说明如下:
- 创建一个Kafka源表,用来从Kafka指定Topic中读取消费数据;
- 创建一个结果表,用来通过JDBC向MySQL中写入结果数据。
- 实现相应的处理逻辑,以实现各个指标的统计。
为了简化最终的处理逻辑,使用创建视图进行数据预处理。 - 利用over窗口条件和过滤条件结合以去除重复数据(该方式是利用了top N的方
法),同时利用相应的内置函数concat和substr将当天的00:00:00作为统计的开始
时间,当天的23:59:59作为统计结束时间,并筛选出支付时间在当天凌晨00:00:00
后的订单数据进行统计(为了方便模拟数据的构造,这里使用"2021-03-25
12:03:00"替代cast(LOCALTIMESTAMP as string))。 - 根据这些数据的订单渠道利用内置的条件函数设置channel_code和channel_name
的值,从而获取了源表中的字段信息,以及begin_time、end_time和
channel_code、channel_name的值。 - 根据需要对相应指标进行统计和筛选,并将结果写入到结果表中。
选择所创建的DLI通用队列提交作业。
Flink Opensource SQL 作业
等待作业状态会变为“运行中”,单击作业名称,可以查看作业详细运行情况。
作业运行状态
使用Kafka客户端向指定topic发送数据,模拟实时数据流。
发送命令如下:
sh kafka_2.11-2.3.0/bin/kafka-console-producer.sh --broker-list kafka连接地址 --topic 指定topic
示例数据如下:
{
"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00",
"pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001",
"user_name":"Alice", "area_id":"330106"}
{
"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06",
"pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001",
"user_name":"Alice", "area_id":"330106"}
{
"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00",
"user_id":"0002", "user_name":"Bob", "area_id":"330110"}
{
"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05",
"pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003",
"user_name":"Cindy", "area_id":"330108"}
{
"order_id":"202103252020200001", "order_channel":"webShop", "order_time":"2021-03-24 20:20:20",
"pay_amount":"600.00", "real_pay":"480.00", "pay_time":"2021-03-25 00:00:00", "user_id":"0004",
"user_name":"Daisy", "area_id":"330102"}
{
"order_id":"202103260808080001", "order_channel":"webShop", "order_time":"2021-03-25 08:08:08",
"pay_amount":"300.00", "real_pay":"240.00", "pay_time":"2021-03-25 08:10:00", "user_id":"0004",
"user_name":"Daisy", "area_id":"330102"}
{
"order_id":"202103261313130001", "order_channel":"webShop", "order_time":"2021-03-25 13:13:13",
"pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-25 16:16:16", "user_id":"0004",
"user_name":"Daisy", "area_id":"330102"}
{
"order_id":"202103270606060001", "order_channel":"appShop", "order_time":"2021-03-25 06:06:06",
"pay_amount":"50.50", "real_pay":"50.50", "pay_time":"2021-03-25 06:07:00", "user_id":"0001",
"user_name":"Alice", "area_id":"330106"}
{
"order_id":"202103270606060002", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06",
"pay_amount":"66.60", "real_pay":"66.60", "pay_time":"2021-03-25 06:07:00", "user_id":"0002",
"user_name":"Bob", "area_id":"330110"}
{
"order_id":"202103270606060003", "order_channel":"miniAppShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"88.80", "real_pay":"88.80", "pay_time":"2021-03-25 06:07:00",
"user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
{
"order_id":"202103270606060004", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06",
"pay_amount":"99.90", "real_pay":"99.90", "pay_time":"2021-03-25 06:07:00", "user_id":"0004",
"user_name":"Daisy", "area_id":"330102"}
单击DLI控制台左侧“作业管理”>“Flink作业”,单击提交的Flink作业。在作业详情页面,可以看到处理的数据记录数。
查询结果
登录MySQL实例,执行如下SQL语句,即可查询到经过Flink作业处理后的结果数据
SELECT * FROM 'dli-demo','trade_channel_collect';
配置DLV大屏,执行SQL查询RDS MySQL,即可以实现大屏实时展示