数据仓库
为什么要分层?
复杂问题简单化:将复杂任务分解成多层,每层只处理简单的任务,方便定位问题
减少重复开发:通过中间层数据,能减少重复计算,增加一次计算结果的复用性
隔离原始数据:过滤异常数据,对数据脱敏,使真实数据与统计数据解耦
关系建模与维度建模
关系模型
- 严格遵守第三范式,数据冗余程度低,数据的一致性容易得到保证
- 数据分布于众多的表中,查询会相对复杂
- 在大数据的场景下,查询效率相对较低
维度模型
- 以数据分析作为出发点,不遵循三范式,故数据存在一定的冗余
- 面向业务,将业务用事实表和维度表呈现出来
- 表结构简单,故查询简单,查询效率较高
星型模型和雪花模型
区别
-
主要在于维度的层级。标准星型模型维度只有一层,而雪花模型可能会涉及多层
-
雪花模型比较靠近3NF,但无法完全遵守,性能成本太高
-
星座模型的区别是事实表的数量,有多张事实表
模型选择
- 判断性能优先还是灵活优先
维度表和事实表
维度表
- 一般是对事实的描述信息。每一张维度表对应现实世界中的一个对象或者概念。 例如:用户、商品、日期、地区等
- 特征
- 维度表的范围很宽(具有多个属性、列比较多)
- 跟事实表相比,行数相对较小,通常< 10万行
- 内容相对固定:编码表,如时间维度表
事实表
-
每行数据代表一个业务事件。例如:下单、支付、退款、评价等
-
每一个事实表的行包括
- 具有可加性的数值型的度量值(可统计次数、个数、金额等)
- 与维度表相连接的外键,通常具有两个和两个以上的外键
-
分类
- 事务型事实表
- 以每个事务或事件为单位。例如一个销售订单记录,一笔支付记录等,作为事实表里的一行数据
- 一旦事务被提交,事实表数据被插入,数据就不再进行更改
- 更新方式:增量更新
- 周期型快照事实表
- 不会保留所有数据,只保留固定时间间隔的数据。例如每天或者每月的销售额,或每月的账户余额等
- 更新方式:全量更新
- 累积型快照事实表
- **用于跟踪业务事实的变化。**例如,数据仓库中可能需要累积或者存储订单从下订单开始,到订单商品被打包、运输、和签收的各个业务阶段的时间点数据,来跟踪订单声明周期的进展情况。当这个业务过程进行时,事实表的记录也要不断更新
- 事务型事实表
数仓分层
ODS
- HDFS用户行为数据、HDFS业务数据
- 规划处理
- 保持数据原貌不做任何修改,起到备份数据的作用
- 数据采用压缩,减少磁盘存储空间
- 创建分区表,防止后续的全表扫描
DIM与DWD(以业务过程为驱动)
- 构建维度模型,一般采用星型模型,呈现的状态一般为星座模型
- 维度建模步骤
- 选择业务过程:一条业务线对应一张事实表,如下单、支付、退款、物流等
- 声明粒度:定义事实表中的一行数据表示什么,尽可能选择最小粒度
- 确认维度:确定事实表中的维度外键,依据是如何描述业务,“谁,何时,做何事”
- 确认事实:确认事实表中的度量值
DWS与DWT(以需求为驱动)
- 确定要建哪些宽表:以维度为基准
- 确定宽表中的字段:站在不同维度去看事实表,重点关注事实表聚合后的度量值
- DWS:存放的所有主题对象当天的汇总行为,例如每个地区当天的下单次数,下单金额等
- DWT:存放的是所有主题对象的累积行为,例如每个地区最近7天(15天、30天、60天)的下单次数、下单金额等
ADS(以需求为驱动)
- 对电商系统各大主题指标分别进行分析
- 访客页面跳转路径
- 用户变动
- 用户行为漏斗
- 用户留存率
- 商品统计
- 品牌复购率
- 订单统计
- 各地区订单统计
- 优惠券统计
- 活动统计
Hadoop
HDFS读写
HDFS读
- 客户端通过 DistributedFileSystem 向 NN 请求下载文件
- NN 通过查询元数据,找到存储有目标文件块的DN地址,返回用于读取的输入流对象
- Client 根据就近原则、负载均衡,挑选一台DN建立连接,读取文件
- DN开始传输数据给Client,从磁盘里读取数据输入流,一个数据块读完后,输入流将获取下一个数据块的位置信息,建立连接,继续读取
- 读取过程中,以Packet为单位来做校验。如遇异常,客户端向NN汇报,并从其他DN继续读取
- Client 以Packet为单位接收,先在本地内存缓存,然后写入目标文件磁盘
- 所有数据块读取完成后,客户端关闭输入流
HDFS写
- Client 向 NN 请求上传文件
- NN 检查 Client 权限、检查目标文件是否存在、父目录是否存在。检查通过则返回输出流对象,操作写入EditLog
- NN 响应 Client 是否可以上传文件
- Client 切分文件块,默认128M。向NN请求上传第一个Block,询问上传到哪几个DN
- NN 进行副本存储结果选择,返回DN1、DN2、DN3,表示采用这三个DN存储数据
- DN1:本地节点
- DN2:其他机架随机一个节点
- DN3:DN2所在机架随机另一个节点
- Client 向DN1请求建立Block传输通道,DN1向DN2请求,DN2向DN3请求,将通信管道建立完成
- DN1、DN2、DN3逐级应答 Client
- Client 开始传输数据
- 先从磁盘读取数据放到本地内存缓存,准备传输第一个Block
- 起始有一个Chunk大小的buf,当Chunk被填满后,计算Chunksum值进行校验,然后填塞入Packet
- 当Packet被填满后,将这个Packet放入应答队列中等待应答
- 进入应答队列的Packet回被另一线程按序取出,发送到下一个DN
- DN每收到一个Packet就传给下一个DN,DN1传给DN2,DN2传给DN3
- 当第一个Block传输完成后,Client 再次请求NN上传第二个Block(重复3-7)
- 所有数据块写完后,客户端关闭输出流
Block、Packet、Chunk
Block:最大的单位,是最终存储在DN上的数据粒度,由
dfs.block.size
参数决定,默认128M,取决于客户端配置Packet:中等的单位,是数据流向DN过程中的数据粒度,由
dfs.write.packet.size
参数决定,默认64k,以这个值为参考动态调整Chunk:最小的单位,是数据流向DN过程中进行数据校验的数据粒度,由
io.byte.per.checksum
参数决定,默认512Byte注意:事实上Chunk还包含一个4Byte的校验值,因此Chunk写入Packet时是516Byte
数据与校验值的比例为128:1,所以一个128M的Block会有一个1M的校验文件
MapReduce流程
Map阶段
- split :对输入文件切片,默认等于数据块大小。切片数量决定并发的MapTask数量
- map :按行读取数据,调用map函数,形成k-v键值对输出
Shuffle(Map端)
- input :将map的输出写入环形缓冲区(默认100M)
- partition :对map结果进行分区,分区规则为对key进行Hash分区
- spill—sort、combine :对不同分区数据进行快速排序,如果有设置combiner,则进行聚合。当缓冲区达到阈值(默认0.8)后,将文件溢写到磁盘
- merge :当溢写小文件超过一定数量(默认3)进行合并,归并排序
Shuffle(Reduce端)
- copy :按照不同分区,从每个map拉取输出文件数据到不同reduce节点上。数据量达到缓冲区阈值时写入磁盘,同样进行partition、combine、排序过程
- merge :不断将溢写文件进行归并排序,最终采用堆排序算法合并,成为reduce输入文件
Reduce阶段
- reduce :调用reduce函数,一个reducer对应一个文件输出
Yarn工作机制
- MR程序提交到客户端所在节点,
jar
包main
方法中执行job.waitForCompletion()
,启动YarnRunner - YarnRunner 向 ResourceManager 申请一个Application
- ResourceManager 将该MR程序的资源路径返回给 YarnRunner :
hdfs://.../staging
以及application_id
- YarnRunner 将运行所需资源提交到HDFS上(
job.submit()
后生成的Job.split
Job.xml
wc.jar
) - 资源提交完毕后,YarnRunner 向 ResourceManager 申请运行 mrAppMaster
- ResourceManager 将用户请求初始化成一个 Task ,放入FIFO调度队列中等待
- 分配一个较空闲的 NodeManager1 从 ResourceManager 领取 Task 任务
- NodeManager1 创建容器 Container ,并产生 MrAppMaster
- NodeManager1 从 HDFS 上拷贝
Job.split
Job.xml
到本地 - NodeManager1 上 MrAppMaster 向 ResourceManager 申请容器运行 MapTask
- 根据
Job.split
切片数量,分配相应数量的 NodeManager2 、NodeManager3 ,分别领取 Task 任务并创建容器 Container (也可以是同一个 NodeManager 创建两个 Container ) - NodeManager1 上 MrAppMaster 向接收到到任务的 NodeManager2 、NodeManager3 发送程序启动脚本,这两个 NodeManager 分别启动 MapTask ,并对结果数据分区排序
- NodeManager1 上 MrAppMaster 等待所有 MapTask 运行完毕后,向 ResourceManager 申请容器运行 ReduceTask
- 根据不同业务需求设置的 Reducer 数量,启动相应的容器,领取 ReduceTask,并向 MapTask 获取相应分区的数据
- 程序运行完毕后,NodeManager1 上 MrAppMaster 会向 ResourceManager 申请注销自己
容量调度器和公平调度器
共同特点
- 多队列
- 每个队列可配置一定的资源量,每个队列采用FIFO调度策略
- 容量保证
- 管理员可为每个队列设置资源最低保证和资源使用上限
- 灵活性
- 如果一个队列中的资源有剩余,可以暂时共享给其他需要资源的队列
- 一旦该队列中有新的应用程序提交,则借调的资源会归还给该队列
- 多租户
- 支持多用户共享集群和或应用程序同时运行
- 为防止同一个用户的作业独占队列中资源,对同一用户提交的作业最大所占资源量进行限定
不同点
- 核心调度策略不同
- 容量调度器:优先选择资源利用率低的队列
- 公平调度器:优先选择对资源的缺额比例大的(缺额:平均分配应得的资源-实际拥有的资源)
- 每个队列可以单独设置资源分配方式
- 容量调度器:FIFO、DRF
- 公平调度器:FIFO、DRF、FAIR
Hive
内部表和外部表的区别
- 创建
- 内部表可以直接创建,外部表需要添加
EXTERNAL
关键字修饰
- 内部表可以直接创建,外部表需要添加
- 默认仓库存储路径
- 内部表:默认创建在
hdfs://uesr/hive/warehouse
下 - 外部表:由LOCATION指明,若不指明,则也在
hdfs://uesr/hive/warehouse
下以外部表的表名创建一个文件夹
- 内部表:默认创建在
- 删除
- 内部表:将元数据和路径下的文件都删除
- 外部表:只删除元数据,不删除路径下的文件
- 加载数据
- 内部表:会把数据移动到自己指定的路径下
- 外部表:不会把数据移动到自己指定的路径下
应用
- 内部表
- ETL处理时,选择内部表做中间表,清理时将HDFS上的文件同时删除
- 对数据的处理都是由HQL语句完成,可使用内部表
- 外部表
- 怕误删数据的情况,不担心数据损坏
- 对数据的处理由HQL和其他工具一同处理时,使用外部表
- 不用加载数据到Hive ,减少数据传输,还能共享
Hive文件存储格式
行列存储比较
行存储的特点
- 查询满足条件的一整行数据时
- 列存储则需要去每个聚集的字段找到对应的每个列的值
- 行存储只需要找到其中一个值,其余的值都在相邻地方
- 此时行存储查询的速度更快
- TEXTFILE、SEQUENCEFILE
列存储的特点
- 查询只需要少数几个字段时
- 列存储每个字段的数据聚集存储,大大减少读取数据量
- 列存储每个字段的数据类型相同,可以针对性地设计更好的压缩算法
- ORC、PARQUET
TEXTFILE
-
默认格式
-
数据不做压缩,磁盘开销大,数据解析开销大
-
可结合 Gzip、Bzip2 使用,
- 但使用 Gzip 这种方式,Hive 不会对数据进行切分,从而无法对数据进行并行操作
Optimized Row Columnar(ORC)
-
Hive 0.11 版里引入的新的存储格式
-
每个 Orc 文件由 1 个或多个 Stripe 组成(Stripe 一般为HDFS块大小),每个 Stripe 包含多条记录(记录按照列进行独立存储,对应 PARQUET 中的 row group)
-
每个 Stripe 里有三部分组成: Index Data,Row Data,Stripe Foote
-
Index Data:一个轻量级的 index。默认是每隔 1W 行做一个索引。这里做的索引只是记录某行的各字段在 Row Data 中的 offset
-
Row Data:存的是具体的数据。先取部分行,然后对这些行按列进行存储。对每个列进行了编码,分成多个 Stream 来存储
-
Stripe Footer:存的是各个 Stream 的类型,长度等信息
-
-
每个文件有一个 File Footer:存的是每个 Stripe 的行数,每个 Column 的数据类型信息等
-
每个文件的尾部是一个 PostScript:记录了整个文件的压缩类型, File Footer 的长度信息等
-
读取文件时,会 seek 到文件尾部读 PostScript,从里面解析到 File Footer 长度,再读 File Footer,从里面解析到各个 Stripe 信息,再读各个 Stripe,即从后往前读
优缺点
- 优点
- 可压缩,高效的列存取
- 查询效率高
- 缺点
- 加载时性能消耗较大,需要通过text文件转换加载
- 读取全量数据性能低
- 适用于Hive中大型的存储、查询
Parquet格式
- 以二进制方式存储,所以不可以直接读取。文件中包括该文件的数据和元数据,因此 Parquet 格式文件是自解析的
- Row Group(行组):每一个行组包含一定的行数,在一个 HDFS 文件中至少存储一 个行组,类似于 orc 的 Stripe 的概念
- Column Chunk(列块):在一个行组中,每一列保存在一个列块中,行组中的所有列连续的存储在这个行组文件中。一个列块中的值都是相同类型的,不同的列块可能使用不同的算法进行压缩
- Page(页):每一个列块划分为多个页,一个页是最小的编码的单位,在同一个列块的不同页可能使用不同的编码方式
- 通常情况下,在存储 Parquet 数据时,会按照 Block 大小设置行组的大小,由于一般情况下每一个 Mapper 任务处理数据的最小单位是一个 Block,这样可以把每一个行组由一个 Mapper 任务处理,增大任务执行并行度
优缺点
-
优点
- 更高效的压缩和编码
- 不与任何数据处理技术绑定,可用于多种数据处理框架
-
缺点
-
不支持update,delete,ACID
-
insert 以 Row Group 为单位追加到已有文件
-
-
适用于字段数非常多,无更新,只取部分列的查询
数据倾斜
Group By
,某个字段的数量过多,处理该字段的 Reduce非常耗时
- 解决措施:
set hive.groupby.skewindata = true;
,生成的查询计划中有两个MR Job,第一个 MR 为这个 Key 加上随机数打散,再用一个MR聚合
distinct、count(distinct XX)
,某个字段特殊值过多,处理此特殊值的 Reduce非常耗时
- 解决措施:能先进行 Group By 就先进行,之后再 count
大表 join 小表,小表中的 key 集中,使得分发到某一个或几个 Reduce上的数据远高于平均值
- 解决措施:使用 Map Join,
set hive.auto.convert.join=true;
,将小表放入内存中,在Map端完成Join,不用进行耗时的Reduce
大表 join 大表,分桶字段的0值或者空值过多,处理此0值或空值的 Reduce非常耗时
- 解决措施:创建大表时使用分桶,并将空值或0值转换为随机数打散
小文件问题
小文件归档:将小文件归档成一个har
文件,对NN来说这是一个整体,允许NN对文件进行透明的访问,对HDFS内部还是一个个独立小文件,减少对NN内存的占用
- 归档
hadoop archive -archiveName input.har -p /input /output
- 解档
hadoop fs -cp har:///output/input.har/* /
FileInputFormat
:每次切片时,都要判断切完剩下部分是否大于块的1.1倍,不大于1.1倍就只划分一块切片
CombineTextInputFormat
:将多个小文件从逻辑上规划到一个切片中。使多个小文件交给一个MapTask处理,设置虚拟存储切片值CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); //4m
Kafka
提高吞吐量
生产者
batch.size
:批次大小,默认16k,可调大linger.ms
:等待时间,默认0,可修改为5-100mscompression.type
:压缩方式,可选择snappyRecordAccumulator
:缓冲区大小,默认32m,可修改为64m
消费者
- 如果是Kafka消费能力不足:考虑增加Topic的分区数,并且同时提升消费组的消费者数量,使消费者数 = 分区数。(两者缺一不可)
- 如果是下游的数据处理不及时:提高每批次拉取的数量(默认500条)或字节数(默认50m)。批次拉取数据过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压
数据可靠性
数据完全可靠条件:ACK级别设置为-1,分区副本大于等于2,ISR里应答的最小副本数量大于等于2
ISR
:in-sync replica set,指和Leader保持同步的Follower和Leader集合(leader:0,ISR:0,1,2)
- 如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被移出ISR。该时间阈值由
replica.lag.time.max.ms
参数设定,默认30s。因此不用长时间等待联系不上或已经故障的节点
数据一致性
不论是老的Leader还是新选举的Leader,消费者都能读到一样的数据
重要概念
-
LEO(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1
-
HW(High Watermark):所有副本中最小的LEO,消费者可见的最大offset
Follower故障处理
- Follower故障后会被临时踢出ISR
- 期间Leader和Follower继续接收数据
- 待该Follower恢复后,Follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向Leader进行同步
- 等该Follower的LEO大于等于该Partition的HW,即Follower追上Leader之后,就可以重新加入ISR了
Leader故障处理
- Leader发生故障之后,会从ISR中选出一个新的Leader
- 为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复
数据有序性
传统队列:并发消费,消息异步发送到消费者导致顺序错乱
Kafka:每个分区数据只发送给一个消费者组(在 Kafka 1.x以后,启用幂等性后,Kafka服务端会缓存producer发来的最近5个request的元数据, 故无论如何,都可以保证最近5个request的数据都是有序的)
注意:Kafka只保证一个分区内消息有序性。如果需要保证Topic中所有消息的有序性,则需要让这个Topic只有一个分区,也就只有一个消费者组消费他
幂等性
幂等性原理
- 幂等性指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
- 数据重复判断标准:具有
<PID, Partition, SeqNumber>
相同主键的消息提交时,Broker只会持久化一条PID
:Kafka每次重启都会分配一个新的PID
Partition
:表示分区号Sequence Number
:单调自增- 幂等性只能保证的是在单分区单会话内不重复
- 设置参数:
enable.idempotence
,默认true,改为false则关闭
文件存储机制
Topic:逻辑上的概念
Partition:物理上的概念
- 每个Partition对应于一个log文件,该log文件中存储的就是生产者生产的数据。
- 生产者生产的数据会被不断追加到该log文件末端。为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制。
- 每个log文件分为多个segment。每个segment包括:
.index
、.log
、.timeindex
等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号,例如:first-0.index
:偏移量索引文件.log
:日志文件,也就是数据文件.timeindex
:时间戳索引文件- 说明:
.index
和.log
文件以当前segment的第一条消息的offset命名
Log文件和Index文件详解:稀疏索引
如何在log文件中定位到offset=600的Record?
- 根据目标offset定位Segment文件
- 找到小于等于目标offset的最大offset对应的index索引项
- 定位到log文件
- 向下遍历找到目标Record
注意:
- index为稀疏索引。大约每往log中写入4kb数据,就会往index中写入一条索引。
log.index.interval.bytes
默认4kb- index文件中保存的是相对offset(即在这个segment中的相对位置),确保offset的值所占空间不会过大,能控制在固定大小
生产者消息发送流程
发送原理
外部数据 --> Kafka 生产者 --> 创建main
线程和Sender
线程
-
main
线程将消息发送给RecordAccumulator
-
send(ProducerRecord)
-->Interceptors
拦截器 -->Serializer
序列化器 -->Partitioner
分区器 -->RecordAccumulator
-
RecordAccumulator
:默认32m -
ProducerBatch
:main
线程传来的数据,被放入DQueue
,等待读取 -
发送条件
batch.size
:只有数据积累到batch.size
之后,sender
才会发送数据。默认16klinger.ms
:如果数据迟迟未达到batch.size
,sender等待linger.ms设置的时间到了之后就会发送数据。单位ms,默认值是0ms,表示没有延迟
-
数据发送之后就从
DQueue
中被清除
-
-
Sender
线程不断从RecordAccumulator
中拉取消息发送到Kafka Broker- Sender读取数据 --> NetworkClient --> InFlightRequests --> Selector --> 发送到Kafka Broker --> 等待应答acks,若失败则不断重试,Broker节点最多缓存5个请求
- 应答acks
- 0:生产者发送过来的数据,不需要等数据落盘应答
- 1:生产者发送过来的数据,Leader收到数据后应答(通常用于普通日志文件)
- -1(all):生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答。(-1和all等价)
Broker工作流程
- Broker启动后在zookeeper中注册(
/brokers/ids/ [0,1,2]
) - 先注册的Broker成为Controller,zookeeper记录此信息
- 由选举出来的 Controller 监听 zookeeper 中 Brokers 节点变化(
/brokers/ids/ [0,1,2]
) - Controller决定Leader选举
- 选举规则:在ISR中存活为前提,按照AR中排在前面的优先。如AR[1,0,2],ISR[0,1,2],则Leader按照1,0,2的顺序轮轮询
- Controller 将节点信息上传到 zookeeper (
/brokers/topics/topic_name/partitions/0/state "leader:0","isr":[0,1,2]
) - 其他 Controller 从zookeeper 同步相关节点信息(以便原 Controller 故障时替换)
- 假设Broker1中Leader挂了
- Controller 监听到zk节点变化,从zk获取ISR
- 选举新的Leader(在ISR中存活为前提,按照AR中排在前面的优先)
- 更新zk中Leader及ISR信息
消费者组原理
初始化
Coordinator:辅助实现消费者组的初始化和分区的分配
coordinator
节点选择 =groupid
的hashcode
值 % 50(consumer_offsets
的分区数量)- 例如:
groupid
的hashcode
值 = 1,1% 50 = 1,那么consumer_offsets
主题的1号分区在哪个broker上,就选择这个节点的coordinator
作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset
- 每个 Consumer 都发出 Join Group 请求
- Coordinator 选出一个Consumer作为Leader
- Coordinator 把要消费的topic情况发送给Consumer Leader
- Consumer Leader 制定消费方案
- Consumer Leader 把消费方案发给 Coordinator
- Coordinator 把这个消费方案发送给各个 Consumer
- 每个消费者都会和 Coordinator保持心跳(默认3s)
- 一旦超时(
session.timeout.ms=45s
),该消费者会被移除,并触发再平衡 - 消费者处理消息的时间过长(
max.poll.interval.ms=5mins
),也触发再平衡
- 一旦超时(
Kafka与传统消息队列的区别
-
分区性:分区内消息有序
-
高可用:副本策略,Leader选举
-
负载均衡:分区内消息只会被消费者组中一个消费者消费,主题中消息可以均衡发送给消费者组中的所有消费者消费