Hbase是列存储的,Hbase支持行级事务
写出hive和hbase的区别
共同点:
1.hbase 与 hive 都是架构在 hadoop 之上的。都是用 hadoop 作为底层存储
区别:
2.Hive 是建立在 Hadoop 之上为了减少 MapReduce jobs 编写工作的批处理系统,HBase是为了支持弥补 Hadoop 对实时操作的缺陷的项目 。
3.想象你在操作 RMDB 数据库,如果是全表扫描,就用 Hive+Hadoop,如果是索引访问,就用 HBase+Hadoop 。
4.Hive query 就是 MapReduce jobs 可以从 5 分钟到数小时不止,HBase 是非常高效的,肯定比 Hive 高效的多。
5.Hive 本身不存储和计算数据,它完全依赖于 HDFS 和 MapReduce,Hive 中的表纯逻辑。
6.hive 借用 hadoop 的 MapReduce 来完成一些 hive 中的命令的执行
7.hbase 是物理表,不是逻辑表,提供一个超大的内存 hash 表,搜索引擎通过它来存储索引,方便查询操作。
8.hbase 是列存储。
9.hdfs 作为底层存储,hdfs 是存放文件的系统,而 Hbase 负责组织文件。
10.hive 需要用到 hdfs 存储文件,需要用到 MapReduce 计算框架。


hive中order by,sort by,distribute by,cluster by作用和用法
https://blog.csdn.net/weixin_41122339/article/details/81708373?depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromBaidu-1&utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromBaidu-1
1、order by
(1):order by后面可以有多列进行排序,默认按字典排序。
(2):order by为全局排序。
(3):order by需要reduce操作,且只有一个reduce,无法配置(因为多个reduce无法完成全局排序)。
order by操作会受到如下属性的制约:
set hive.mapred.mode=nonstrict; (default value / 默认值)
set hive.mapred.mode=strict;
注:如果在strict模式下使用order by语句,那么必须要在语句中加上limit关键字,因为执行order by的时候只能启动单个reduce,如果排序的结果集过大,那么执行时间会非常漫长。
2、sort by
(1) sort by 为reduce内排序。只保证每个reducer的输出有序,不保证全局有序。
(2) sort by 不受 hive.mapred.mode 是否为strict ,nostrict 的影响
3、distribute by
按照指定的key分发数据,并保证key相同的会被划分到同一个reduce。
如:insert overwrite local directory '/home/hadoop/out' select * from test order by name distribute by length(name);
此方***根据name的长度划分到不同的reduce中,最终输出到不同的文件中。
4、cluster by
cluster by key = distribute by key sort by key desc ,是一个简写。cluster by只能降序排序。


外部表的创建和删除:
【创建】什么时候需要创建外部表?

  • 当一份数据需要被多种工具分析时,说明所有权不属于Hive,这个时候可以创建一个外部表指向这份数据,因此在创建表时也不会将数据真正移动到HDFS的目录下。
    【删除】删除外部表时会删除表中数据吗?
  • 不会。当删除外部表时,Hive会认为没有完全拥有这份数据,只会该外部表的元数据信息,而不会删除表中的数据。
    内部表在删除表时会将表的数据和元数据一起删除
    Hive学习

1.Hive是什么
1.1 Hive概念
1.2 Hive与数据库区别
1.3 Hive的优点和缺点
1.4 Hive架构原理
2.Hive的交互方式
3 Hive的数据类型
3.1 基本数据类型
3.2 复合数据类型
4.Hive的数据类型转换
5.Hive的DDL操作
5.1 Hive的数据库DDL操作
5.2 Hive的表DDL操作(重要)
5.2.1 建表语法:
5.2.2 创建内部表,不加external就是内部表
5.2.3 创建外部表
5.2.4 导入数据:
5.2.5 内部表和外部表的互相转换
5.2.6 内部表与外部表的区别
5.3 hive的其他命令
6.Hive的分区表
6.1hive的分区表的概念
6.2 分区表的创建
7. hive修改表
8. Hive数据的导入和导出
8.1Hive数据导入
8.1.1 向表中加载数据
8.1.2 通过查询语句向表中插入数据
8.1.3查询语句中创建表并导入数据
8.1.4创建表时通过location指定加载数据路径
8.1.5import数据到指定hive表中
8.2 Hive数据导出
8.2.1 insert导出
8.2.2Hadoop命令导出
8.2.3 Hive shell命令导出
8.2.4export导出到HDFS上
9.hive的静态分区和动态分区以及分桶表
9.1 静态分区
9.2 动态分区
9.3hive的分桶表
10 hive的DML
10.1 基本查询
11.hive的数据压缩和文件存储格式
11.1数据的压缩说明
11.2数据压缩使用
11.3hive的文件存储格式
11.3.1 hive的文件存储格式说明
11.3.2 文件存储格式的使用对比
11.3.3 Orc支持的三种压缩
11.4 存储方式和压缩总结
12.hive的函数
12.1系统内置函数
12.2自定义函数
12.2.1 自定义函数编程步骤
12.2.2 自定义函数案例实战
13.hive的SerDe
13.1 hive的SerDe是什么
13.2SerDe的类型
13.3企业实战
14.Hive的企业级调优(重要)
14.1Fetch抓取
14.2 本地模式
14.3 表的优化
14.3.1 大表join小表
14.3.3 map join
14.3.4 设置hive的group by参数
14.3.5 count(distinct)
14.3.6 笛卡尔积
14.4 使用分区裁剪和列裁剪
14.5 并行执行
14.6 严格模式
14.7 JVM重用
14.8 推测执行
14.9 压缩
14.10 数据倾斜
Hive课后题

  1. 将数据直接传到HDFS分区目录上,怎么让分区表和数据产生关联?
  2. 桶表是否可以直接通过load将数据导入?
  3. hive的分区可以提高效率,那么分区是否越多越好?为什么?
  4. 什么情况下Hive可以避免进行mapreduce?
  5. order by ,sort by , distribute by , cluster by 的区别?
  6. 如何将数据以动态分区的方式插入分区表中?
  7. 数据倾斜现象和解决办法?(重要)
    修改记录

为什么数据要进行分布式存储?
考虑安全性,做冗余备份;为了方便于分布式计算。
1.Hive是什么
1.1 Hive概念
​ Hive是由FaceBook开源,主要用于解决海量结构化日志的数据统计。它是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射成一张表,并提供类SQL查询功能,本质上是将HSQL转化成MR程序。
1.2 Hive与数据库区别

对于数据操作来说,Hive不支持数据更新删除主要原因是Hive在HDFS中存储,进行删除是物理删除,代价比较高,所以只支持覆盖和追加
Hive扩展性好是可以在多个集群的服务器上做应用开发
Hive的读时模式快,这点指Hive加载数据到表中时不会做数据的校验,在读取数据时才校验,它的查询延迟主要浪费在资源调度上,进行任务划分然后进行计算任务的申请。
1.3 Hive的优点和缺点
优点:
操作接口采用类SQL语法,提供快速开发的能力(简单、容易上手)。
避免了去写MapReduce,减少开发人员的学习成本。
Hive支持用户自定义函数,用户可以根据自己的需求来实现自己的函数。
缺点:
Hive 不支持记录级别的增删改操作
Hive 的查询延时很严重(MR程序需要资源调度,任务计算)
Hive 不支持事务
1.4 Hive架构原理

1、用户接口:Client
CLI(提供交互shell方式接入hive)、JDBC/ODBC(java访问hive)、WEBUI(浏览器访问hive)
2、元数据:Meta
元数据包括:表名、表所属的数据库(默认是default)、表的拥有者、列/分区字段、表的类型(是否是外部表)、表的数据所在目录等;
默认存储在自带的derby数据库中,有安装MySQL就可存储在Mysql中
3、Hadoop集群
使用HDFS进行存储,使用MapReduce进行计算。
4、Driver:驱动器
解析器(SQL Parser) 对SQL进行语法分析和语义分析。
将SQL字符串转换成抽象语法树AST
对AST进行语法分析,比如表是否存在、字段是否存在、SQL语义是否有误。
编译器(Physical Plan):将AST编译生成逻辑执行计划。
优化器(Query Optimizer):对逻辑执行计划进行优化。
执行器(Execution):把逻辑执行计划转换成可以运行的物理计划。对于Hive来说默认就是mapreduce任务
Hive的工作原理(参考上面的架构图)
​ Hive首先是一个客户端工具,它提供了一些用户可操作的接口,可以通过交互shell,JDBC和web UI方式连接Hive,在Hive的内部有个Driver驱动器,驱动器里面实现了解析器,编译器,优化器和执行器的功能,在用Hsql查询表时,sql语句在驱动器中会先做语法和语义解析,解析之后再进行相应的语法编译,然后在通过优化器时产生逻辑计划和物理计划,并进行优化,最后在执行器中转换成对应的mr jar包,打包给hadoop集群运行获得结果。
Hive将表和存放在hdfs的数据建立映射关系,在用SQL分析语句查询表时,sql语句会被解析成对应的mr程序,mr程序调用jar包并从元数据获取数据存储的地址进行相关计算,最后由MapReduce得出结果。元数据是存放在hive指定的数据库,比如mysql中。

2.Hive的交互方式
​ 前提是先启动Hadoop集群和mysql服务。
​ 交互方式有三种:Hive交互Shell,Hive JDBC服务和Hive的命令
Hive交互shell
通过输入在hive中的/bin目录下的hive命令,进入hive,直接输出sql查询
cd /opt/bigdata/hivebin/hivehive>
#直接查询,和mysql一样;
Hive JDBC服务。
启动hiveserver2服务
前台启动 bin/hive --service hiveserver2
后台启动 nohup bin/hive --service hiveserver2 &
然后 beeline连接hiveserver2
#首先查询node1的端口,netstat nlp可以查看当前启动了哪些服务和对应的端口
netstat nlpbin/beelinebeeline> !connect jdbc:hive2://node1:10000
Hive的命令方式
hive -e sql语句
cd /opt/bigdata/hivebin/hive -e "show databases;"

hive -f sql文件 可以执行包含有sql语句的文件,挺常用
/bin/hive -f textsql.sql

3 Hive的数据类型
​ hive的数据类型要注意有基本类型和复合数据类型,复合多是数组,键值对这种,当然还有能存储复杂json格式文本的数据类型。
3.1 基本数据类型
类型名称 描述 举例
boolean true/false true
tinyint 1字节的有符号整数 1
smallint 2字节的有符号整数 1
int 4字节的有符号整数 1
bigint 8字节的有符号整数 1
float 4字节单精度浮点数 1.0
double 8字节单精度浮点数 1.0
string 字符串(不设长度) “abc”
varchar 字符串(1-65355长度,超长截断) “abc”
timestamp 时间戳 1563157873
date 日期 20190715
3.2 复合数据类型
类型名称 描述 举例
array 一组有序的字段,字段类型必须相同 array(元素1,元素2) Array(1,2,3)
map 一组无序的键值对 map(k1,v1,k2,v2) Map(‘a’,1,‘b’,2)
struct 一组命名的字段,字段类型可以不同 struct(元素1,元素2) Struct(‘a’,1,2,0)
array字段的元素访问方式:
下标获取元素,下标从0开始
获取第一个元素
array[0]
map字段的元素访问方式
通过键获取值
获取a这个key对应的value
map[‘a’]
#用法select map_key('???'),map_values('?????') from user;

struct字段的元素获取方式
定义一个字段c的类型为struct{a int;b string}
获取a和b的值
使用c.a 和c.b 获取其中的元素值
这里可以把这种类型看成是一个对象
createtable complex( col1 array<int>, col2 map<string,int>, col3 struct<a:string,b:int,c:double>)</int>

4.Hive的数据类型转换
​ 主要有两种,一种是隐式转换,小的会自动转换成大的,另一种是用cast函数手动转换
隐式类型转换
系统自动实现类型转换,不需要用户干预
如tinyint可以转换成int,int可以转换成bigint。
所有整数类型、float 和 string类型都可以隐式地转换成double。
tinyint、smallint、int都可以转换为float。
boolean类型不可以转换为任何其它的类型。
手动类型转换
可以使用cast函数操作显示进行数据类型转换
cast (‘1’ as int)将把字符串’1’ 转换成整数1;
如果强制类型转换失败,如执行cast(‘x’ as int),表达式返回空值 NULL。
5.Hive的DDL操作
5.1 Hive的数据库DDL操作
#创建数据库
hive> show databases;
#显示数据库
hive> show databases like 'db_hive*';
#查看数据库详情
hive> desc database db_hive;
#显示数据库详细信息
hive> desc database extended db_hive;
#切换当前数据库
hive > use db_hive;
#删除数据库
hive> drop database if exists db_hive;
#如果数据库中有表存在,那么要加cascade强制删除
hive> drop database if exists db_hive cascade;

5.2 Hive的表DDL操作(重要)
5.2.1 建表语法:
CREATE[EXTERNAL]TABLE[IFNOTEXISTS] table_name [(col_name data_type [COMMENT col_comment],...)][COMMENT table_comment][PARTITIONED BY(col_name data_type [COMMENT col_comment],...)] 分区[CLUSTEREDBY(col_name, col_name,...) 分桶[SORTED BY(col_name [ASC|DESC],...)]INTO num_buckets BUCKETS][ROW FORMAT row_format]row format delimited fieldsterminatedby “分隔符”collection items terminatedby':'map keysterminatedby':'[STORED AS file_format][LOCATION hdfs_path]

EXTERNAL 指定一个外部表
PARTITIONED BY 创建分区表
CLUSTERED BY 创建分桶
SORTED BY 按照字段排序(一般不常用)
ROW FORMAT 指定每一行中字段的分隔符
比如:row format delimited fields terminated by ‘\t’
STORED AS 指定存储文件类型,
常用的存储文件类型:SEQUENCEFILE(二进制序列文件)、TEXTFILE(文本)、RCFILE(列式存储格式文件)
如果文件数据是纯文本,可以使用STORED AS TEXTFILE。如果数据需要压缩,使用 STORED AS SEQUENCEFILE。
LOCATION 指定表在HDFS上的存储位置。值得一提的是,假如建表后指定了location,后期直接往这个地址存储sql格式的数据,那么对应的表也是能查到这部分新增的数据的,因为这里建表定义的location实际上就是指向某个文件地址。
5.2.2 创建内部表,不加external就是内部表
​ 有三种常用建表方法
直接建表
create table if not exists student(id int, name string)row format delimited fields terminated by '\t' (如果不指定,整行数据会被当做一个字段)stored as textfile;(不指定就默认是文本文件)
查询建表法
#通过AS语句,将查询的子结果存在新表里create table if not exists student1 as select id, name from student;
Like建表法
create table if not exists student2 like student;
查詢表類型:可以查出很多表结构信息
hive > desc formatted student;
5.2.3 创建外部表
create external table if not exists default.emp(id int,name string,age int)
row format delimited
fields terminated by '\t'
location '/hive/bigdata' (这个目录可以现实不存在,建表时创建)

创建外部表的时候需要加上external 关键字
location字段可以指定,也可以不指定
指定就是数据存放的具体目录
不指定就是使用默认目录 /user/hive/warehouse

5.2.4 导入数据:
在建表的数据库位置执行 loaddatalocal inpath '/home/hadoop/data/student.txt'intotable student;
需要注意的是,这里就算直接把txt文件的数据存放到hdfs上表对应的存储目录,那么在select*FROM时也能查出后来添加的数据,因为建表定义存放位置时,实际是指向具体的文件,将文件映射成表结构展示出来。

5.2.5 内部表和外部表的互相转换
#内部表改为外部表
alter table student set tblproperties('EXTERNAL'='TRUE');
#外部表改为内部表
alter table student set tblproperties('EXTERNAL'='FALSE');

5.2.6 内部表与外部表的区别
1.创建表时:创建内部表时,会将数据移动到数据仓库指向的路径;创建外部表时需要加上external关键字,它仅记录数据所在的路径,不对数据的位置做任何改变
2.删除表时:删除表后,内部表的元数据和真实数据会被一起删除,而外部表仅删除元数据,不删除真实数据,这样外部表相对来说更加安全些,数据组织也比较灵活,方便共享原始数据。(直接重建原来的表后,数据就自动导入到原来的表去了,location直接指向原来存储的位置.)
外部表保障底层数据的安全性,内部表适用于管理中间表和结果表。
5.3 hive的其他命令
hive cli命令窗口查看本地文件系统
与操作本地文件系统类似,这里需要使用 ! (感叹号),并且最后需要加上 ;(分号)
例如
!ls /;
hive cli命令窗口查看HDFS文件系统
与查看HDFS文件系统类似,可以不用加hadoop或者hdfs
dfs -ls / ;
hive的底层执行引擎有3种
mapreduce(默认)
tez(支持DAG作业的计算框架)
spark(基于内存的分布式计算框架),企业可以切换这种框架,基于内存的速度更快。


6.Hive的分区表
6.1hive的分区表的概念
​ 把表的数据分目录存储,存储在不同的文件夹下,后期按照不同的目录查询数据,不需要进行全量扫描,提升查询效率。
​ 分区没有上限,但一般是3个以内。
6.2 分区表的创建
创建一个分区字段的分区表,一级分区
createtable student_partition1(id int,name string,age int)partitioned by(dt string)row format delimited fieldsterminatedby'\t';
创建二级分区
createtable student_partition2(id int,name string,age int)partitioned by(month string,day string)row format delimited fieldsterminatedby'\t';

  1. hive修改表

    修改表名称alter table student_partition1 rename to student_P1;#查看表的信息desc student_p1;desc formatted student_p1;#增加列alter table student add columns(address string);#修改列alter table student change columns address address_id int;#替换列alter table student replace columns(deptno string,dname string,loc string); --替换表中所有的字段。

修改分区(注意这里partition不加ed)
#添加单个分区
alter table student add partition(dt='20200402');
#添加多个分区
alter table student add partition(dt='20200401',dt='20200402');或者alter table student add partition(dt='20200401') partition(dt='20200402');
#删除分区
alter table student drop partition(dt='20200401');
alter table student drop partition('20200401'),partition(dt='20200402');
#查看分区show partitions student;

  1. Hive数据的导入和导出
  2. 1Hive数据导入
  3. 1.1 向表中加载数据
    ​ 语法:
    load data [local] inpath 'dataPath' override | into table student [partition 分区值];
    1
    ​ load data:表示加载数据
    ​ local:表示从本地加载数据到表,如果忽略不写则表示从HDFS加载数据
    ​ inpath:表示加载数据的路径
    ​ override:表示覆盖表中已有的数据,需要注意使用
    ​ into table:表示加载到哪张表
    ​ partition:表示上传到制定分区

    例如:load data local inpath '/opt/bigdata/person.txt' into table person partition(dt='20200401');

8.1.2 通过查询语句向表中插入数据
从指定的表中查询数据结果然后插入目标表中
insert into | override table student select * from XXXX;
insert into | override table student partition(dt='20200401') select * from XXXX;

8.1.3查询语句中创建表并导入数据
create table if not exists student as select * from XXXX;

8.1.4创建表时通过location指定加载数据路径
create table if not exists student (id int,name string)
row format delimited fields terminated by '\t
'location '/user/hive/warehouse/student1';
#然后往hdfs的路径上传对应的数据student.txt
hdfs fs -put /opt/bigdata/student.txt /user/hive/warehouse/student1

8.1.5import数据到指定hive表中
​ 注意先用export导出数据后,再import
createtable student2 like student1;
export table student1 to'/opt/bigdata/student1';(这里是hdfs的目录)
import table student2 from'/opt/bigdata/student1';

8.2 Hive数据导出
8.2.1 insert导出
1.将查询的结果导出到本地
insert override local directory '/opt/bigdata/student';
#默认文件分隔符会是‘\001’
#之后本地会生成一个日志型的文件
2.将查询的结果格式化后导出到本地
insert override local directory '/opt/bigdata/student' row format delimited fields terminated by ',';
3.将查询的结果导出到HDFS(注意这里没有local)
insert override directory '/export/student' row format delimited fields terminated by ',';

8.2.2Hadoop命令导出
​ get命令直接下载到本地磁盘
hdfs fs -get /usr/hive/warehouse/student/student.txt /opt/bigdata/data

8.2.3 Hive shell命令导出
有两种:
1.hive -e “sql语句” >> file; 这种是直接执行sql语句,把结果导出到文件中国。
2.hive -f “sql文件” > file; 这种是执行完sql文件后,将查询结果写入到file中
bin/hive -e 'select * from default.student;' >> /opt/bigdata/student/student.txt

8.2.4export导出到HDFS上
hive>export table student to '/usr/hive/warehouse/student';

9.hive的静态分区和动态分区以及分桶表
9.1 静态分区
​ 表的分区字段的值需要开发人员手动指定
创建分区表
createtable order_partition(order_number string,order_price double,order_time string)partitioned by(month string)
row format delimited fieldsterminatedby'\t';
导出数据到分区表
load data local inpath '/opt/bigdata/order_created.txt' overwide into table order_partition partition(month='20200401');

9.2 动态分区
​ 按照需求实现把数据自动导入到表的不同分区中,不需要手动指定
#创建表语句没有变
createtable order_partition(order_number string,order_price double,order_time string)partitioned by(month string)row format delimited fieldsterminatedby'\t';
#不同的是导入时动态指定分区值
#首先需要设置动态分区的参数
hive>set hive.exec.dynamic.partition=true;//使用动态分区
hive>set hive.exec.dynamic.partition.mode=nonstrict //非严格模式
#然后导入数据,注意这里字段查询的顺序,分区字段一定要放在最后,否则数据有问题
insertintotable order_partition partition(month)select order_number,order_price,order_time,monthfrom 源表;
#最后可以查看分区,注意partition加s
show partitions order_partition;

9.3hive的分桶表
​ 分桶是将整个数据内容按照某列属性值去hash值进行区分,对取得的hash再做模运算(columnValue.hashCode % 桶数),具有相同结果的数据进入同一个文件中。(本质上来说可以看成是对一个大文件进行拆分成小文件)
​ 比如将name列分为4个桶,则name属性的值取hash值后对4求模运算,取模结果0,1,2,3的分开存放到不同文件中。
作用:
取样更高效,没有分区的话需要扫描整个数据集。(可以看下哪些值出现频率比较高,也可以避免数据倾斜)
抽样查询桶表的语句:
tablesample(bucket x out of y)
x 表示从第几个桶开始取数
y 一共需要从 桶数/y 个桶中取数据
select*from user_buckets_demo tablesample(bucket 1outof2);
--假设建表时分了4个桶。--表示从表中抽样数据,从第一个桶开始,取两个桶的数,第二个桶是1+2 = 3,即从第一,第三个桶抽样取数据调查、--需要抽样的桶数 : 4/2 = 2个

提升某些查询操作效率,比如map side join(在对列分桶时,已经将相同值聚集在一起了,所以能提高查询效率)
案例
createtable user_buckets_demo(id int,name string)clusteredby(id)into4 bucketsrow format delimited fieldsterminatedby'\t';
加载数据到分桶表分桶表不能直接load加载数据,需要从其它表查询插入
Insert into table user_buckets_demo select*from user_demo;

10 hive的DML
10.1 基本查询
开启本地模式
如果没有开启本地模式,那么大部分函数操作会经过yarn调度分配,速度较慢。开启后就不会提交到yarn中去,能快速返回查询结果。
限制:如果文件超过256M或者文件个数超过4个,那么系统也会自动关闭本地模式。
set hive.exec.model.local.auto=true;
1
order by全局排序
order by在最后进行排序时只有一个reduce
每个MapReduce内部排序(Sort by) 局部排序
sort by:每个reduce内部进行排序,对全局结果集来说不是排序。sort by针对多个reduce中每个reduce进行排序。
设置reduce个数
set mapreduce.job.reduces=3;
查看reduce个数
set mapreduce.job.reduces;
将查询结果导入到文件中(按照成绩列降序排序)
insert override local directory'/opt/bigdata/sort' select * from student s sort by s.score;
#查询结果会被分成3个文件,每个文件都按成绩排序好。

distribute by 分区排序
字段.hashCode % reduce个数
distribute by :类似于mr中的partition,采用hash算法,在map端将查询结果中hash值相同的结果分发到对应的reduce中,结合sort by使用。
注意dirtribute by要在sort by之前
案例:
先设置reduce个数
set mapreduce.job.reduces=3;
通过distribute by 进行数据的分区,将不同的sid划分到不同的reduce中去
insert override local directory '/opt/bigdata/distribute'select*from student distribute by sid sort by score;
结果是先根据sid分到不同reduce,然后在每个reduce对成绩排序,那么如果这里reduce只有一个,其实跟全局排序效果是一样的

有一点要注意的是,distribute by和分桶的概念很像,都用到了hashcode和模运算,不同的是分桶是为了在hdfs存储时将大文件分成多个内部相关联的小文件,而distribute by 这里是针对查询的结果集进行一个排序展示。
cluster by
当distribute by 和sort by的字段一样时,可以用cluster by代替,效果是一样的。
11.hive的数据压缩和文件存储格式
11.1数据的压缩说明
​ 使用压缩的优势在于可以最小化所需要的磁盘空间,以及减少磁盘和网络的io操作。
压缩模式评价:
可以用以下三种标准对压缩方式进行评价
压缩比:压缩比越高,压缩后的文件越小,压缩比越高越好
压缩时间:越快越好
压缩之后的格式文件是否可分割:如果可以分割,那么单一文件可以由多个mapreduce程序处理,可以更好的并行化。
有四种常见压缩格式和物种编码解码器方式,这里暂时不阐述。
hadoop checknative #可以查看支持哪些压缩算法

11.2数据压缩使用
hive表中间数据压缩(可以针对shuffle阶段存入磁盘时进行压缩)
#设置为true,激活中间压缩算法的功能,默认时falseset hive.exec.compress.intermediate=true;
#设置中间数据的压缩算法。
压缩方式可以有五种选择(编码解码器方式),具体上网查。set mapred.map.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
hive表最终输出结果压缩(可以针对reduce输出后的结果)
set hive.exec.compress.output=trueset mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;

11.3hive的文件存储格式
11.3.1 hive的文件存储格式说明
hive的文件存储格式支持:textFile,sequencefile(kv键值对),orc(基于列存储),parquent (基于列存储)
其中textFile是默认格式,建表时默认是这种格式。而其他三种是不能直接从本地文件导入到hdfs的,数据需要先导入到textFile格式的表中,然后再通过textFile的表用insert导入到他们自身对应格式的表中。
textFile和sequenceFile都是基于行存储的
orc,parquent都是基于列存储的。
11.3.2 文件存储格式的使用对比
​ 详见《Hive主流文件存储格式对比.md》 https://blog.csdn.net/xsdxs/article/details/53152599
11.3.3 Orc支持的三种压缩
​ orc支持的三种压缩:ZLIB,Snappy,none,最后一种就是不压缩,默认时ZLIB
通过建表时 stored as orc tblproperties("orc.compress"="snappy") 来设置。
1
11.4 存储方式和压缩总结
orc默认的压缩方式ZLIB比snappy压缩的还小
在实际项目开发中,hive表的数据存储格式一般选择orc或者parquent
由于snappy的压缩和解压缩效率比较高,所以压缩方式一般选择snappy
即有效方式文件存储压缩是采用orc + snappy这种方式。
12.hive的函数
12.1系统内置函数
查看系统自带的函数
show functions
显示自带的函数用法
desc function 函数名
12.2自定义函数
​ 自带的函数有限,可以通过自定义UDF 来方便的扩展。
​ 自定义函数 UDF user-define-function
根据用户自定义函数类别分为以下三种:
UDF
一进一出
UDAF
聚合函数,多进一出
类似于count/max/min
UDTF
一进多出
12.2.1 自定义函数编程步骤
1.定义一个类继承org.apache.hadoop.hive.ql.UDF
2.需要实现evaluate函数,evaluate函数支持重载
3.将程序打包jar,上传到linux服务器
4.在hive的命令行窗口创建函数
添加jar包
add jar XXXX.jar9jiar包路径0
创建function
create [temporary(临时标志)] function [dbname.]function_name AS class_name(jar包中完整类名);
使用UDF函数
select function_name from student;
hive命令中删除函数
Drop [temprory] function [if exists] [dbname.数据库名字]function_name;
注意UDF必须有返回值,可以返回null,但不能是void
UDF函数永久使用
1.把自定义函数的jar上传到hdfs中
hdfs dfs -put hive_udf.jar /jars
2.创建永久函数
create function toUpper as 'com.kaikeba.udf.MyUDF' using jar 'hdfs://node1:9000/jars/hive_udf.jar';
1
3.在sql中像普通函数一样使用。
12.2.2 自定义函数案例实战
13.hive的SerDe
13.1 hive的SerDe是什么
​ SerDe是Serializer/Deserializer(序列化和反序列化)的简写,hive使用Serde进行 行对象的序列和反序列化,最后实现把文件内容映射到hive表中。
​ 先了解下hive如何读取数据,类似与hdfs中数据读写:
HDFS file -> InputFileFormat -> key,value -> Deserializer(反序列化) -> Row objectRow object -> Serializer(序列化) -> key,value -> OutputFileFormat -> HDFS file

13.2SerDe的类型
Hive中内置org.apache.hadoop.hive.serde2库,内部封装了很多不同serde类型
hive创建 表时,通过自定义的Serde或使用内置的serde库指定数据的序列化和反序列化方式:
CREATE[EXTERNAL]TABLE[IFNOTEXISTS] table_name[(col_name data_type [col_name...]...)][COMMENT table_comment][PARTITIONED BY(col_name data_type)][CLUSTEREDBY(col_name data_type)][SORTED BY(col_name [ASC|DESC]...)]INTO num_buckers BUCKETSROW FORMAT row_format[STORED AS file_format][LOCATION hdfs_path]

创建表时可以使用用户自定义的SerDe或者native SerDe,如果ROW FORMAT没有指定,或者指定了ROW FORMAT DELIMITED就会使用native SerDe类型
Hive SerDe:
Avro
ORC
RegEx
Thrift
Parquet
CSV
MultiDelimitSerDe
13.3企业实战
使用MultiDelimitSerDe,比如解决多字符分割场景,像某个导入的文件中内容有##这样的作为分隔符。
createtable t1(id int,name string)row format serde 'org.apache.hadoop.hive.contrib.serde2.MultiDelimitSerDe'WITH SERDEPROPERTIES("field.delim"="#
#");

通过RegexSerDe解决多字符分割场景(正则表达式)
这里也是拿###这种分隔符举例
createtable t1(id int,name string)row format serde 'org.apache.hadoop.hive.serde2.RegexSerDe'WITH SERDEPROPERTIES("input.regex"="^(.)\#\#(.).name")as name form t1;使用json_tuple(sonStr,k1,k2...),返回值是一个元组,直接解析k1,k2字段select json_tuple(jsonContext,'id','name')as(id,name)from t1;

14.Hive的企业级调优(重要)
14.1Fetch抓取
​ Fetch抓取是指Hive中对某些查询可以不必使用mapreduce计算。
​ 比如select * from
​ 在这种情况下,Hive可以简单地读取表对应存储目录下的文件,然后输出查询结果到控制台。
​ 在配置文件hive-default.xml.template文件中,可以设置hive.fetch.task.conversion为more,这样在全局查找,字段查找和limit查找等都不走mapreduce。
set hive.fetch.task.conversion=more;#直接返回,不走
mapreduceselect * from t1;select sex from t1;select * from t1 limit5;

14.2 本地模式
在Hive客户端测试时,默认情况下是启用hadoop的job模式,把任务提交到集群中运行,但我们可以通过设置本地模式让hive在单台机器上处理任务,针对小数居集,执行时间可以明显被缩短。
开启本地模式,并执行查询语句、set hive.exec.mode.local.auto=true;
设置本地模式的最大输入数据量,当输入数据量小于这个值时就采用本地模式,默认为128Mset hive.exec.mode.local.auto.inputbytes.max=50000000;
设置本地模式的最大输入文件个数,当输入文件个数小于这个数字时就启用本地模式set hive.exec.mode.local.auto.input.files.max=5;

14.3 表的优化
14.3.1 大表join小表
​ 先让小表(1000条记录以下的)进入到内存中,后续大表加载时可以直接从内存中获取小表的数据,整个过程都在map端执行,不需要用到reduce,也就不需要用到mr中的shuffle,能提升查询效率。
​ map join就是让多个数据结果直接在map端完成聚合操作,也就是在map端实现reduce端的逻辑。
14.3.3 map join
​ 如果不指定map join或者不符合map join的条件,那么Hive解析器会将join操作转换成common join,即在reduce阶段完成join,容易发生数据倾斜,可以用map join把小表全部加载到内存,在map端进行join,避免reduce处理。
工作机制:简单来说就是先将小表存入内存中,具体是分布式缓存,后面大表会启动一个没有reduce的task,并在map阶段和刚刚存储内存中的小表关联合并,然后输出结果文件。有多少个map task 就有多少个结果文件。
大表小表的阀值设置(默认25M以下认为是小表)
set hive.mapjoin.smalltable.filesize=25000000;
1
14.3.4 设置hive的group by参数
​ 并不是所有的聚合操作都需要在reducec端完成,很多聚合操作都可以现在map端先进行部分聚合,最后在reduce端得出最终结果(类似于mr过程中的combiner,预先合并压缩数据,再提供给reduce统计计算)
开启map端聚合参数设置。
是否在map段進行聚合,默认是true
set hive.map.aggr =true
在map端进行聚合操作的条目数目
set hive.groupby.mapaggr.checkinterval =100000;
有数据倾斜的时候进行负载均衡,比如把相同特征的key分发到不同的reduce中(默认是false)
set hive.groupby.skewindata =true;

14.3.5 count(distinct)
​ 数据量小的话count(distinct)无所谓,但在数据量大的时候由于count(distinct)操作需要用到一个reduce task来完成,这个reduce 需要处理的数据量太大,就会导致整个job很难完成,这时可以改成先group by在count方式替换
每个reduce任务处理的数据量默认是256M
set hive.exec.reducers.bytes.per.reducer=32123456;
select count(distinct id)from log_text;改成:select count(id) from (select id from log_text groupby id);
虽然可能会多开一个job来运行groupby,但换取的查询速度是值得的。

14.3.6 笛卡尔积
​ 尽量避免笛卡尔积,产生笛卡尔积时,hive只能使用reduce来做相应处理,那么等待时间就变很长。
14.4 使用分区裁剪和列裁剪
​ 分区裁剪也就是加上分区条件,筛选的不必要的数据;
​ 同样列裁剪是加上具体需要的列的数据,少用select *
14.5 并行执行
​ 和oracle一样也可以利用并行执行提高查询速度,不同的是hive是靠参数来空值的
开启并行执行
set hive.exec.parallel =true;
设置同一个sql允许的最大并行度,默认是8
set hive.exec.parallel.thread.number =16;

14.6 严格模式
​ hive有可以设置严格模式,可以防止用户执行那些可能意想不到的,或者有不好影响的查询,比如等待了长时间发现是笛卡尔积,或者查询大表时忘记加分区条件。
set hive.mapred.mode= strict;
1
​ 开启严格模式后,就可以禁止以下三种操作:
(1)对于分区表,在where条件中必须要有分区字段的过滤条件来限制范围;
(2)对于使用了order by的查询,要求使用limit语句
(3)笛卡尔积;
违反以上三种情况,系统都会直接报错,不允许执行。
14.7 JVM重用
​ JVM重用可以使得JVM实例在同一个job中重新使用多次,减少进程的启动和销毁时间
#设置jvm重用个数set mapred.job.reuse.jvm.num.tasks =5;
1
2
14.8 推测执行
​ 说简单点就是Hadoop用了一个备胎来同时执行,跟原来的任务相比较,谁先执行完成就用谁的计算结果作为最终的计算结果。具体定义如下:
​ Hadoop采用了推测执行机制,它根据一定的法则推测出”拖后腿“的任务,并为这样的任务启动一个备份任务,让备份任务和原始任务同时处理一份数据,并最后选择优先执行完成的任务计算结果作为最终结果。
开启推测执行机制set hive.mapred.reduce.tasks.speculative.exection =true;

14.9 压缩
Hive表中间数据压缩
设置为true为激活中间数据压缩功能,默认是false,没有开启set
hive.exec.compress.intermediate =true;
设置中间数据的压缩算法
set mapred map.output.compression = codec = org.apache.hadoop.io.compress.SnappyCodec;
Hive表最终输出结果压缩
set hive.exec.compress.output =true;
set mapred map.output.compression = codec = org.apache.hadoop.io.compress.SnappyCodec;

14.10 数据倾斜
​ hive针对数据倾斜,还有以下方法可以解决
首先要合理设置Map数
通常情况下map数是根据输入文件总格数,文件大小和集群设置的文件块大小决定的,如果一个任务中有很多小文件,每个文件都当作一个块需要一个map任务来运行,那么此时不是越多map数越好,map任务的启动和初始化过程也会占用时间,这时就应该考虑减少map数
另外一种情况,假如某个map它处理的逻辑比较负责,比如某个文件有几千万的记录,那么只交给一个map处理的话明显耗费的时间很长,这时可以考虑增加map数
在map执行之前将小文件合并可以减少map数
在map执行前合并小文件,减少map数
CombineHiveInputFormat具有对小文件进行合并的功能(系统默认的格式)
set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

对复杂文件可以增加map数
当输入的文件都很大,任务逻辑复杂,map执行非常缓慢的时候可以设当增加map数,来使得每个map处理的数据量减少,从而提高速度
增加map方法为:
根据compute(SliteSize(Math.max(minSize,Math.min(maxSize,blocksizs))))公式
调整maxSize最大值,让maxSize小于blocksize就可以增加map数
mapreduce.input.fileinputformat.split.minsize =1;# 默认值为1
mapreduce.input.fileinputforma.split.maxsize = long.MAXValue;#默认值
有这样一个公式:Math.max(minSize,Math.min(maxSize,blockSize))
比如这样设置就可以达到增加map数的效果设置maxSize大小为10M,这样小于一个block128M的话,系统就会分配更多10M的map来处理复杂任务。
set mapreduce.input.fileinputformat.split.maxsize =10485760;

另外也要合理设置Reduce数
1.调整reduce个数的方法一:
设置每个reduce处理的数据量,默认是256M
set hive.exec.reducers.bytes.per.reducer =256000000;
设置每个任务最大的reduce数,默认是1009
set hive.exec.reducers.max =1009;

     这样reduce的个数就处于这样一个范围:总输入数据量 / 每个reduce处理的数据量 - 每个任务最大的reduce数

2.调整reduce个数的方法二:
设置每个job中reduce的个数,默认是1
set mapreduce.job.reduces =3;
1
但是reduce也不是越多越好的,同样过多的reduce任务的启动和初始化都会占用系统资源和时间。

Hive课后题

  1. 将数据直接传到HDFS分区目录上,怎么让分区表和数据产生关联?
    因为上传到hdfs后,hive没有对应元数据信息所以无法查询到对应数据。可以上传数据后给分区表添加该目录的分区
    dfs -mkdir -p 分区目录
    dfs -put 分区数据
    hive>alter table 表名 add partition(分区);

  2. 桶表是否可以直接通过load将数据导入?
    不可以,因为load数据的话hdfs下只会有一个文件无法完成分桶的效果,需要通过中间表导入数据

  3. hive的分区可以提高效率,那么分区是否越多越好?为什么?
    不是越多越好
    hive底层是存储在hdfs上的,hdfs是适合存储大文件而不适合小文件,如果有越多的分区,那么会增加namenode的负担。
    hive会转化成mr程序,mr会转化为多个task任务,多个个小文件的话,每个文件一个task,每个task运行一个JVM实例,JVM的开启和销毁都会降低系统性能。
    所以分区数要合理设计,一般在3个以内。

  4. 什么情况下Hive可以避免进行mapreduce?
    如果是进行简单的查询,直接select,不带count,sum这些聚合函数的,都不会走mapreduce,而是直接读取hdfs目录中的文件。
    另外如果查询语句中的过滤条件只是分区字段的情况下,也不会走mapreduce
    select*from order_partition wheremonth='2019-03';
    还有就是可以手动设置,让hive使用本地模式,当然这种有限制,需要查询的文件不超过256M或者文件数量不超过4个,否则系统还是会自动走mapreduce
    set hive.exec.mode.local.auto =true;

  5. order by ,sort by , distribute by , cluster by 的区别?
    Order by会对所给的全部数据进行全局排序,只启动一个reduce来处理。
    Sort by是局部排序,它可以根据数据量的大小启动一到多个reducer来工作,并且在每个reduce中单独排序。
    Distribute by 类似于mr中的partition,采用hash算法,在map端将查询结果中hash值相同的结果分发到对应的reduce中,结合sort by使用
    Cluster by 可以看作是distribute by 和sort by的结合,当两者后面所跟的字段列名相同时,效果就等同于使用cluster by,但是cluster by最终的结果只能是降序,无法指定升序和降序。

  6. 如何将数据以动态分区的方式插入分区表中?

  7. 首先创建对应分区表和一张普通表

  8. 然后将数据加载到普通表
    loaddatalocal inpath '/opt/bigdata/order_partition'intotable tt_order;

  9. 最后利用普通表来将数据加载到动态分区表中
    先设置使用动态分区的参数和使用非严格模式
    set hive.exec.dynamic.partition=true;set hive.exec.dynamic.partition.mode=nonstrict;
    然后通过普通表导入分区表
    insertintotable order_partition partition(year,month)select order_number,order_price,substring(order_time,0,4)asyear,substring(order_time,6,12)asmonth from tt_order;
    注意导入的字段顺序,分区键一定要放在最后,否则会报错。

  10. 数据倾斜现象和解决办法?(重要)

  11. 什么是数据倾斜?
    大量相同特征的key出现在同一个reduce任务中,或者某个key对应的数据量远超过其它key的数据量,这种导致数据分布不均匀的现象就叫做数据倾斜。

  12. 数据倾斜的现象
    在执行任务的时候,任务进度长时间卡在99%左右,查看任务监控页面或者详细日志信息,发现只有少量,一个或者几个reduce子任务没有跑完,主要因为这几个reduce任务处理的数据量和其它reduce任务差异过大。这种单一reduce任务的记录数与平均记录数差异过大,,就会极大拖长计算时间。
    现实工作中可能会遇到这样的情况比较多:比如大表join小表,其中小表有特别的key值比较集中,这样分发到某一个reduce上的数据就会高于平均值;或者是大表join大表中,作为连接判断的字段0值或者空值较多的,这些0值和空值后续都会由一个reduce处理,导致这个reduce处理量过多;再有的情况就是group by、count( distinct )某个字段值数据多而导致reduce处理耗时的情况。

  13. 数据倾斜的原因
    key分布不均匀,比如空值,0值
    业务数据本身的特性。
    建表时考虑不周,导致后期join操作时数据倾斜
    某些sql语句本身就有数据倾斜。比如用count(distinct),它会单独用一个reduce来计算统计,如果数据量很大,就会导致整个job很难完成。这种情况可以先用group by分出需要统计的字段,再进行sum或者count

  14. 数据倾斜的解决方案,有三个层面可以思考处理:
    第一,SQL语句调优
    查询语句加上具体需要的列和分区键,有些复杂表的字段会存储json格式的文本,这些字段不一定是需要查询的就可以过滤掉,减轻reduce计算负担
    大表join小表时用map jion,让小表先进内存,然后大表与小表在map端完成join操作,避免reduce端处理。
    大表join大表中,可以把空值的key变成一个字符串然后加上rand()随机数,后续mr的分区操作会把倾斜的数据重新分发到不同的reduce上,从而避免数据倾斜。或者在join 的on条件中先让key为空的值 不参与关联,等key不为空的数据相互合并连接后再union all加回key为空的数据。
    selectfrom a leftouterjoin b oncasewhere id isnullthen concat('任意字符串',rand())else id end= b.id;
    select
    from log a join users b on a.id isnotnulland a.id = b.idunionall selct *from log a where a.id isnull;

查询语句中count(distinct) 改成group by + sum(),比如
select count(distinct id) from test; ==> select sum(id) from (select id from test group by id); 这种可能会多开一个reduce来完成group by的操作,但会明显提高查询速度。
针对不同数据类型产生的数据倾斜,存在这样的情况,A表中的id字段的数据类型是int,但join的B表中id字段存在脏数据,有一些是int类型但也有string类型的,那么再join操作时,默认的hash操作就会对int类型的key进行分配,而对于string类型的key会被统一分配到一个reduce中,这种情况就需要先进行类型转换,如 a join b on a.id = cast(b.id as int);
还有一些时候可以把数据倾斜的数据单独拿出来处理,然后再union all回去。
第二,通过设置hive参数配置解决,这种主要是优化计算速度,避免数据倾斜发生
开启map端聚合
并不是所有的聚合操作都需要在reducec端完成,很多聚合操作都可以现在map端先进行部分聚合,最后在reduce端得出最终结果(类似于mr过程中的combiner,预先合并压缩数据,再提供给reduce统计计算)
再hive开启map端聚合后,一旦发现数据倾斜,系统就能自动负载均衡,把相同特征的key分发到不同的reduce中,主要通过hive.groupby.skewindata参数完成。
开启map端聚合的设置是否在map段進行聚合,默认是true
set hive.map.aggr =true在map端进行聚合操作的条目数目set hive.groupby.mapaggr.checkinterval =100000;有数据倾斜的时候进行负载均衡,比如把相同特征的key分发到不同的reduce中(默认是false)set hive.groupby.skewindata =true;

设置并行执行
和oracle一样也可以利用并行执行提高查询速度,不同的是hive是靠参数来空值的
开启并行执行set hive.exec.parallel =true;设置同一个sql允许的最大并行度,默认是8set hive.exec.parallel.thread.number =16;

设置压缩
压缩可以在map端要进行shuffle时压缩和在完成reduce输出时压缩
Hive表中间数据压缩
设置为true为激活中间数据压缩功能,默认是false,没有开启set hive.exec.compress.intermediate =true;设置中间数据的压缩算法set mapred map.output.compression = codec = org.apache.hadoop.io.compress.SnappyCodec;

Hive表最终输出结果压缩
set hive.exec.compress.output =true;set mapred map.output.compression = codec = org.apache.hadoop.io.compress.SnappyCodec;

推测执行
说简单点就是Hadoop用了一个备胎来同时执行,跟原来的任务相比较,谁先执行完成就用谁的计算结果作为最终的计算结果。具体定义如下:
​ Hadoop采用了推测执行机制,它根据一定的法则推测出”拖后腿“的任务,并为这样的任务启动一个备份任务,让备份任务和原始任务同时处理一份数据,并最后选择优先执行完成的任务计算结果作为最终结果。
#开启推测执行机制set hive.mapred.reduce.tasks.speculative.exection =true;

JVM重用
JVM重用可以使得JVM实例在同一个job中重新使用多次,减少进程的启动和销毁时间
#设置jvm重用个数set mapred.job.reuse.jvm.num.tasks =5;
合理设置map数和reduce数
在map执行之前将小文件合并可以减少map数
系统默认的格式,可以不用设置。
set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

对复杂文件可以增加map数
增加map方法有一个公式:
compute(SliteSize(Math.max(minSize,Math.min(maxSize,blocksizs))))公式-
调整maxSize最大值,让maxSize小于blocksize就可以增加map数- minSize默认等于1,maxSize默认等于blockSize大小。比如这样设置就可以达到增加map数的效果设置每个map处理的文件maxSize大小为10M,这样小于一个block128M的话,系统就会分配更多10M的map来处理复杂任务。set mapreduce.input.fileinputformat.split.maxsize =10485760;

合理设置Reduce数,比如设置每个job中reduce的个数为3个
set mapreduce.job.reduces =3;

第三,修改MR程序去避免数据倾斜
可以在MR程序的reduce方法中追踪每个键的最大值,并且设置阈值,当超过该阈值时就可以认为发生了数据倾斜,然后输出到日志文件进行分析。
第二种是在编写MR程序时,从业务层面去考虑自定义的分区键是否合理。就跟ADS库建表时可以默认指定哪个字段作为分区键。
MR程序中改用TotalOrderPartitioner替换HashPartitioner,它可以通过对原始数据进行抽样得到的结果集来预设分区边界值,也就是能找出导致数据倾斜的key值,再分散处理。
MR程序中使用Combiner。