数据仓库

为什么要分层?

复杂问题简单化:将复杂任务分解成多层,每层只处理简单的任务,方便定位问题

减少重复开发:通过中间层数据,能减少重复计算,增加一次计算结果的复用性

隔离原始数据:过滤异常数据,对数据脱敏,使真实数据与统计数据解耦

关系建模与维度建模

关系模型

  • 严格遵守第三范式,数据冗余程度低,数据的一致性容易得到保证
  • 数据分布于众多的表中,查询会相对复杂
  • 在大数据的场景下,查询效率相对较低

维度模型

  • 以数据分析作为出发点,不遵循三范式,故数据存在一定的冗余
  • 面向业务,将业务用事实表和维度表呈现出来
  • 表结构简单,故查询简单,查询效率较高

星型模型和雪花模型

区别

  • 主要在于维度的层级。标准星型模型维度只有一层,而雪花模型可能会涉及多层

  • 雪花模型比较靠近3NF,但无法完全遵守,性能成本太高

  • 星座模型的区别是事实表的数量,有多张事实表

模型选择

  • 判断性能优先还是灵活优先

维度表和事实表

维度表

  • 一般是对事实的描述信息每一张维度表对应现实世界中的一个对象或者概念。 例如:用户、商品、日期、地区等
  • 特征
    • 维度表的范围很宽(具有多个属性、列比较多)
    • 跟事实表相比,行数相对较小,通常< 10万行
    • 内容相对固定:编码表,如时间维度表

事实表

  • 每行数据代表一个业务事件。例如:下单、支付、退款、评价等

  • 每一个事实表的行包括

    • 具有可加性的数值型的度量值(可统计次数、个数、金额等)
    • 与维度表相连接的外键,通常具有两个和两个以上的外键
  • 分类

    • 事务型事实表
      • 以每个事务或事件为单位。例如一个销售订单记录,一笔支付记录等,作为事实表里的一行数据
      • 一旦事务被提交,事实表数据被插入,数据就不再进行更改
      • 更新方式:增量更新
    • 周期型快照事实表
      • 不会保留所有数据,只保留固定时间间隔的数据。例如每天或者每月的销售额,或每月的账户余额等
      • 更新方式:全量更新
    • 累积型快照事实表
      • **用于跟踪业务事实的变化。**例如,数据仓库中可能需要累积或者存储订单从下订单开始,到订单商品被打包、运输、和签收的各个业务阶段的时间点数据,来跟踪订单声明周期的进展情况。当这个业务过程进行时,事实表的记录也要不断更新

数仓分层

ODS

  • HDFS用户行为数据、HDFS业务数据
  • 规划处理
    1. 保持数据原貌不做任何修改,起到备份数据的作用
    2. 数据采用压缩,减少磁盘存储空间
    3. 创建分区表,防止后续的全表扫描

DIM与DWD(以业务过程为驱动)

  • 构建维度模型,一般采用星型模型,呈现的状态一般为星座模型
  • 维度建模步骤
    1. 选择业务过程:一条业务线对应一张事实表,如下单、支付、退款、物流等
    2. 声明粒度:定义事实表中的一行数据表示什么,尽可能选择最小粒度
    3. 确认维度:确定事实表中的维度外键,依据是如何描述业务,“谁,何时,做何事”
    4. 确认事实:确认事实表中的度量值

DWS与DWT(以需求为驱动)

  • 确定要建哪些宽表:以维度为基准
  • 确定宽表中的字段:站在不同维度去看事实表,重点关注事实表聚合后的度量值
  • DWS:存放的所有主题对象当天的汇总行为,例如每个地区当天的下单次数,下单金额等
  • DWT:存放的是所有主题对象的累积行为,例如每个地区最近7天(15天、30天、60天)的下单次数、下单金额等

ADS(以需求为驱动)

  • 对电商系统各大主题指标分别进行分析
    • 访客页面跳转路径
    • 用户变动
    • 用户行为漏斗
    • 用户留存率
    • 商品统计
    • 品牌复购率
    • 订单统计
    • 各地区订单统计
    • 优惠券统计
    • 活动统计

Hadoop

HDFS读写

HDFS读

  1. 客户端通过 DistributedFileSystem 向 NN 请求下载文件
  2. NN 通过查询元数据,找到存储有目标文件块的DN地址,返回用于读取的输入流对象
  3. Client 根据就近原则、负载均衡,挑选一台DN建立连接,读取文件
  4. DN开始传输数据给Client,从磁盘里读取数据输入流,一个数据块读完后,输入流将获取下一个数据块的位置信息,建立连接,继续读取
  5. 读取过程中,以Packet为单位来做校验。如遇异常,客户端向NN汇报,并从其他DN继续读取
  6. Client 以Packet为单位接收,先在本地内存缓存,然后写入目标文件磁盘
  7. 所有数据块读取完成后,客户端关闭输入流

HDFS写

  1. Client 向 NN 请求上传文件
  2. NN 检查 Client 权限、检查目标文件是否存在、父目录是否存在。检查通过则返回输出流对象,操作写入EditLog
  3. NN 响应 Client 是否可以上传文件
  4. Client 切分文件块,默认128M。向NN请求上传第一个Block,询问上传到哪几个DN
  5. NN 进行副本存储结果选择,返回DN1、DN2、DN3,表示采用这三个DN存储数据
    • DN1:本地节点
    • DN2:其他机架随机一个节点
    • DN3:DN2所在机架随机另一个节点
  6. Client 向DN1请求建立Block传输通道,DN1向DN2请求,DN2向DN3请求,将通信管道建立完成
  7. DN1、DN2、DN3逐级应答 Client
  8. Client 开始传输数据
    • 从磁盘读取数据放到本地内存缓存,准备传输第一个Block
    • 起始有一个Chunk大小的buf,当Chunk被填满后,计算Chunksum值进行校验,然后填塞入Packet
    • 当Packet被填满后,将这个Packet放入应答队列中等待应答
    • 进入应答队列的Packet回被另一线程按序取出,发送到下一个DN
    • DN每收到一个Packet就传给下一个DN,DN1传给DN2,DN2传给DN3
  9. 当第一个Block传输完成后,Client 再次请求NN上传第二个Block(重复3-7)
  10. 所有数据块写完后,客户端关闭输出流

Block、Packet、Chunk

Block:最大的单位,是最终存储在DN上的数据粒度,由dfs.block.size参数决定,默认128M,取决于客户端配置

Packet:中等的单位,是数据流向DN过程中的数据粒度,由dfs.write.packet.size参数决定,默认64k,以这个值为参考动态调整

Chunk:最小的单位,是数据流向DN过程中进行数据校验的数据粒度,由io.byte.per.checksum参数决定,默认512Byte

注意:事实上Chunk还包含一个4Byte的校验值,因此Chunk写入Packet时是516Byte

数据与校验值的比例为128:1,所以一个128M的Block会有一个1M的校验文件

MapReduce流程

Map阶段

  1. split :对输入文件切片,默认等于数据块大小。切片数量决定并发的MapTask数量
  2. map :按行读取数据,调用map函数,形成k-v键值对输出

Shuffle(Map端)

  1. input :将map的输出写入环形缓冲区(默认100M)
  2. partition :对map结果进行分区,分区规则为对key进行Hash分区
  3. spill—sort、combine :对不同分区数据进行快速排序,如果有设置combiner,则进行聚合。当缓冲区达到阈值(默认0.8)后,将文件溢写到磁盘
  4. merge :当溢写小文件超过一定数量(默认3)进行合并,归并排序

Shuffle(Reduce端)

  1. copy :按照不同分区,从每个map拉取输出文件数据到不同reduce节点上。数据量达到缓冲区阈值时写入磁盘,同样进行partition、combine、排序过程
  2. merge :不断将溢写文件进行归并排序,最终采用堆排序算法合并,成为reduce输入文件

Reduce阶段

  1. reduce :调用reduce函数,一个reducer对应一个文件输出

Yarn工作机制

  1. MR程序提交到客户端所在节点,jarmain方法中执行job.waitForCompletion()启动YarnRunner
  2. YarnRunner 向 ResourceManager 申请一个Application
  3. ResourceManager 将该MR程序的资源路径返回给 YarnRunner :hdfs://.../staging以及application_id
  4. YarnRunner 将运行所需资源提交到HDFS上(job.submit()后生成的Job.split Job.xml wc.jar
  5. 资源提交完毕后,YarnRunner 向 ResourceManager 申请运行 mrAppMaster
  6. ResourceManager 将用户请求初始化成一个 Task ,放入FIFO调度队列中等待
  7. 分配一个较空闲的 NodeManager1 从 ResourceManager 领取 Task 任务
  8. NodeManager1 创建容器 Container ,并产生 MrAppMaster
  9. NodeManager1 从 HDFS 上拷贝Job.split Job.xml到本地
  10. NodeManager1 上 MrAppMaster 向 ResourceManager 申请容器运行 MapTask
  11. 根据Job.split切片数量,分配相应数量的 NodeManager2 、NodeManager3 ,分别领取 Task 任务并创建容器 Container (也可以是同一个 NodeManager 创建两个 Container )
  12. NodeManager1 上 MrAppMaster 向接收到到任务的 NodeManager2 、NodeManager3 发送程序启动脚本,这两个 NodeManager 分别启动 MapTask ,并对结果数据分区排序
  13. NodeManager1 上 MrAppMaster 等待所有 MapTask 运行完毕后,向 ResourceManager 申请容器运行 ReduceTask
  14. 根据不同业务需求设置的 Reducer 数量,启动相应的容器,领取 ReduceTask,并向 MapTask 获取相应分区的数据
  15. 程序运行完毕后,NodeManager1 上 MrAppMaster 会向 ResourceManager 申请注销自己

容量调度器和公平调度器

共同特点

  • 多队列
    • 每个队列可配置一定的资源量,每个队列采用FIFO调度策略
  • 容量保证
    • 管理员可为每个队列设置资源最低保证和资源使用上限
  • 灵活性
    • 如果一个队列中的资源有剩余,可以暂时共享给其他需要资源的队列
    • 一旦该队列中有新的应用程序提交,则借调的资源会归还给该队列
  • 多租户
    • 支持多用户共享集群和或应用程序同时运行
    • 为防止同一个用户的作业独占队列中资源,对同一用户提交的作业最大所占资源量进行限定

不同点

  • 核心调度策略不同
    • 容量调度器:优先选择资源利用率低的队列
    • 公平调度器:优先选择对资源的缺额比例大的(缺额:平均分配应得的资源-实际拥有的资源
  • 每个队列可以单独设置资源分配方式
    • 容量调度器:FIFO、DRF
    • 公平调度器:FIFO、DRF、FAIR

Hive

内部表和外部表的区别

  • 创建
    • 内部表可以直接创建,外部表需要添加EXTERNAL关键字修饰
  • 默认仓库存储路径
    • 内部表:默认创建在hdfs://uesr/hive/warehouse
    • 外部表:由LOCATION指明,若不指明,则也在hdfs://uesr/hive/warehouse下以外部表的表名创建一个文件夹
  • 删除
    • 内部表:将元数据和路径下的文件都删除
    • 外部表:只删除元数据,不删除路径下的文件
  • 加载数据
    • 内部表:会把数据移动到自己指定的路径下
    • 外部表:不会把数据移动到自己指定的路径下

应用

  • 内部表
    • ETL处理时,选择内部表做中间表,清理时将HDFS上的文件同时删除
    • 对数据的处理都是由HQL语句完成,可使用内部表
  • 外部表
    • 怕误删数据的情况,不担心数据损坏
    • 对数据的处理由HQL和其他工具一同处理时,使用外部表
    • 不用加载数据到Hive ,减少数据传输,还能共享

Hive文件存储格式

行列存储比较

行存储的特点

  • 查询满足条件的一整行数据时
    • 列存储则需要去每个聚集的字段找到对应的每个列的值
    • 行存储只需要找到其中一个值,其余的值都在相邻地方
    • 此时行存储查询的速度更快
  • TEXTFILE、SEQUENCEFILE

列存储的特点

  • 查询只需要少数几个字段时
    • 列存储每个字段的数据聚集存储,大大减少读取数据量
    • 列存储每个字段的数据类型相同,可以针对性地设计更好的压缩算法
  • ORC、PARQUET

TEXTFILE

  • 默认格式

  • 数据不做压缩,磁盘开销大,数据解析开销大

  • 可结合 Gzip、Bzip2 使用,

    • 但使用 Gzip 这种方式,Hive 不会对数据进行切分,从而无法对数据进行并行操作

Optimized Row Columnar(ORC)

  • Hive 0.11 版里引入的新的存储格式

  • 每个 Orc 文件由 1 个或多个 Stripe 组成(Stripe 一般为HDFS块大小),每个 Stripe 包含多条记录(记录按照列进行独立存储,对应 PARQUET 中的 row group)

    • 每个 Stripe 里有三部分组成Index DataRow DataStripe Foote

    • Index Data:一个轻量级的 index。默认是每隔 1W 行做一个索引。这里做的索引只是记录某行的各字段在 Row Data 中的 offset

    • Row Data:存的是具体的数据。先取部分行,然后对这些行按列进行存储。对每个列进行了编码,分成多个 Stream 来存储

    • Stripe Footer:存的是各个 Stream 的类型,长度等信息

  • 每个文件有一个 File Footer:存的是每个 Stripe 的行数,每个 Column 的数据类型信息等

  • 每个文件的尾部是一个 PostScript:记录了整个文件的压缩类型File Footer 的长度信息等

  • 读取文件时,会 seek 到文件尾部读 PostScript,从里面解析到 File Footer 长度,再读 File Footer,从里面解析到各个 Stripe 信息,再读各个 Stripe,即从后往前读

优缺点

  • 优点
    • 可压缩,高效的列存取
    • 查询效率高
  • 缺点
    • 加载时性能消耗较大,需要通过text文件转换加载
    • 读取全量数据性能低
  • 适用于Hive中大型的存储、查询

Parquet格式

  • 以二进制方式存储,所以不可以直接读取。文件中包括该文件的数据和元数据,因此 Parquet 格式文件是自解析的
  • Row Group(行组):每一个行组包含一定的行数,在一个 HDFS 文件中至少存储一 个行组,类似于 orc 的 Stripe 的概念
  • Column Chunk(列块):在一个行组中,每一列保存在一个列块中,行组中的所有列连续的存储在这个行组文件中。一个列块中的值都是相同类型的,不同的列块可能使用不同的算法进行压缩
  • Page(页):每一个列块划分为多个页,一个页是最小的编码的单位,在同一个列块的不同页可能使用不同的编码方式
  • 通常情况下,在存储 Parquet 数据时,会按照 Block 大小设置行组的大小,由于一般情况下每一个 Mapper 任务处理数据的最小单位是一个 Block,这样可以把每一个行组由一个 Mapper 任务处理,增大任务执行并行度

优缺点

  • 优点

    • 更高效的压缩和编码
    • 不与任何数据处理技术绑定,可用于多种数据处理框架
  • 缺点

    • 不支持update,delete,ACID

    • insert 以 Row Group 为单位追加到已有文件

  • 适用于字段数非常多,无更新,只取部分列的查询

数据倾斜

Group By ,某个字段的数量过多,处理该字段的 Reduce非常耗时

  • 解决措施set hive.groupby.skewindata = true;,生成的查询计划中有两个MR Job,第一个 MR 为这个 Key 加上随机数打散,再用一个MR聚合

distinct、count(distinct XX) ,某个字段特殊值过多,处理此特殊值的 Reduce非常耗时

  • 解决措施:能先进行 Group By 就先进行,之后再 count

大表 join 小表,小表中的 key 集中,使得分发到某一个或几个 Reduce上的数据远高于平均值

  • 解决措施:使用 Map Join,set hive.auto.convert.join=true;,将小表放入内存中,在Map端完成Join,不用进行耗时的Reduce

大表 join 大表,分桶字段的0值或者空值过多,处理此0值或空值的 Reduce非常耗时

  • 解决措施:创建大表时使用分桶,并将空值或0值转换为随机数打散

小文件问题

小文件归档:将小文件归档成一个har文件,对NN来说这是一个整体,允许NN对文件进行透明的访问,对HDFS内部还是一个个独立小文件,减少对NN内存的占用

  • 归档
hadoop archive -archiveName input.har -p /input /output
  • 解档
hadoop fs -cp har:///output/input.har/*    /

FileInputFormat:每次切片时,都要判断切完剩下部分是否大于块的1.1倍,不大于1.1倍就只划分一块切片

CombineTextInputFormat:将多个小文件从逻辑上规划到一个切片中。使多个小文件交给一个MapTask处理,设置虚拟存储切片值CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); //4m

Kafka

提高吞吐量

生产者

  • batch.size:批次大小,默认16k,可调大
  • linger.ms:等待时间,默认0,可修改为5-100ms
  • compression.type:压缩方式,可选择snappy
  • RecordAccumulator:缓冲区大小,默认32m,可修改为64m

消费者

  • 如果是Kafka消费能力不足:考虑增加Topic的分区数,并且同时提升消费组的消费者数量,使消费者数 = 分区数。(两者缺一不可)
  • 如果是下游的数据处理不及时:提高每批次拉取的数量(默认500条)或字节数(默认50m)。批次拉取数据过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压

数据可靠性

数据完全可靠条件:ACK级别设置为-1,分区副本大于等于2,ISR里应答的最小副本数量大于等于2

ISR:in-sync replica set,指和Leader保持同步的Follower和Leader集合(leader:0,ISR:0,1,2)

  • 如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被移出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。因此不用长时间等待联系不上或已经故障的节点

数据一致性

不论是老的Leader还是新选举的Leader,消费者都能读到一样的数据

重要概念

  • LEO(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1

  • HW(High Watermark):所有副本中最小的LEO,消费者可见的最大offset

Follower故障处理

  • Follower故障后会被临时踢出ISR
  • 期间Leader和Follower继续接收数据
  • 待该Follower恢复后,Follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向Leader进行同步
  • 等该Follower的LEO大于等于该Partition的HW,即Follower追上Leader之后,就可以重新加入ISR了

Leader故障处理

  • Leader发生故障之后,会从ISR中选出一个新的Leader
  • 为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据

注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复

数据有序性

传统队列:并发消费,消息异步发送到消费者导致顺序错乱

Kafka:每个分区数据只发送给一个消费者组(在 Kafka 1.x以后,启用幂等性后,Kafka服务端会缓存producer发来的最近5个request的元数据, 故无论如何,都可以保证最近5个request的数据都是有序的)

注意:Kafka只保证一个分区内消息有序性。如果需要保证Topic中所有消息的有序性,则需要让这个Topic只有一个分区,也就只有一个消费者组消费他

幂等性

幂等性原理

  • 幂等性指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
  • 数据重复判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条
    • PID:Kafka每次重启都会分配一个新的PID
    • Partition:表示分区号
    • Sequence Number:单调自增
    • 幂等性只能保证的是在单分区单会话内不重复
  • 设置参数enable.idempotence,默认true,改为false则关闭

文件存储机制

Topic:逻辑上的概念

Partition:物理上的概念

  • 每个Partition对应于一个log文件,该log文件中存储的就是生产者生产的数据。
  • 生产者生产的数据会被不断追加到该log文件末端。为防止log文件过大导致数据定位效率低下,Kafka采取了分片索引机制。
  • 每个log文件分为多个segment。每个segment包括:.index.log.timeindex等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号,例如:first-0
    • .index:偏移量索引文件
    • .log:日志文件,也就是数据文件
    • .timeindex:时间戳索引文件
    • 说明:.index.log文件以当前segment的第一条消息的offset命名

Log文件和Index文件详解:稀疏索引

如何在log文件中定位到offset=600的Record?

  1. 根据目标offset定位Segment文件
  2. 找到小于等于目标offset的最大offset对应的index索引项
  3. 定位到log文件
  4. 向下遍历找到目标Record

注意:

  • index为稀疏索引。大约每往log中写入4kb数据,就会往index中写入一条索引。log.index.interval.bytes 默认4kb
  • index文件中保存的是相对offset(即在这个segment中的相对位置),确保offset的值所占空间不会过大,能控制在固定大小

生产者消息发送流程

发送原理

外部数据 --> Kafka 生产者 --> 创建main线程和Sender线程

  • main线程将消息发送给RecordAccumulator

    • send(ProducerRecord) --> Interceptors拦截器 --> Serializer序列化器 --> Partitioner分区器 -->RecordAccumulator

    • RecordAccumulator:默认32m

    • ProducerBatchmain线程传来的数据,被放入DQueue,等待读取

    • 发送条件

      • batch.size:只有数据积累到batch.size之后,sender才会发送数据。默认16k
      • linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送数据。单位ms,默认值是0ms,表示没有延迟
    • 数据发送之后就从DQueue中被清除

  • Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker

    • Sender读取数据 --> NetworkClient --> InFlightRequests --> Selector --> 发送到Kafka Broker --> 等待应答acks,若失败则不断重试,Broker节点最多缓存5个请求
    • 应答acks
      • 0:生产者发送过来的数据,不需要等数据落盘应答
      • 1:生产者发送过来的数据,Leader收到数据后应答(通常用于普通日志文件)
      • -1(all):生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答。(-1和all等价)

Broker工作流程

  • Broker启动后在zookeeper中注册(/brokers/ids/ [0,1,2]
  • 先注册的Broker成为Controller,zookeeper记录此信息
  • 由选举出来的 Controller 监听 zookeeper 中 Brokers 节点变化(/brokers/ids/ [0,1,2]
  • Controller决定Leader选举
    • 选举规则:在ISR中存活为前提,按照AR中排在前面的优先。如AR[1,0,2],ISR[0,1,2],则Leader按照1,0,2的顺序轮轮询
  • Controller 将节点信息上传到 zookeeper (/brokers/topics/topic_name/partitions/0/state "leader:0","isr":[0,1,2]
  • 其他 Controller 从zookeeper 同步相关节点信息(以便原 Controller 故障时替换)
  • 假设Broker1中Leader挂了
    • Controller 监听到zk节点变化,从zk获取ISR
    • 选举新的Leader(在ISR中存活为前提,按照AR中排在前面的优先)
    • 更新zk中Leader及ISR信息

消费者组原理

初始化

Coordinator:辅助实现消费者组的初始化和分区的分配

  • coordinator节点选择 = groupidhashcode值 % 50( consumer_offsets的分区数量)
  • 例如: groupidhashcode值 = 1,1% 50 = 1,那么consumer_offsets主题的1号分区在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset
  1. 每个 Consumer 都发出 Join Group 请求
  2. Coordinator 选出一个Consumer作为Leader
  3. Coordinator 把要消费的topic情况发送给Consumer Leader
  4. Consumer Leader 制定消费方案
  5. Consumer Leader 把消费方案发给 Coordinator
  6. Coordinator 把这个消费方案发送给各个 Consumer
  7. 每个消费者都会和 Coordinator保持心跳(默认3s)
    • 一旦超时(session.timeout.ms=45s),该消费者会被移除,并触发再平衡
    • 消费者处理消息的时间过长(max.poll.interval.ms=5mins),也触发再平衡

Kafka与传统消息队列的区别

  • 分区性:分区内消息有序

  • 高可用:副本策略,Leader选举

  • 负载均衡:分区内消息只会被消费者组中一个消费者消费,主题中消息可以均衡发送给消费者组中的所有消费者消费