https://dev.mysql.com/downloads/mysql/
下载mysql-8.0.23-linux-glibc2.12-x86_64.tar.xz
上传到/usr/local下.注意由于一些配置文件默认安装路径是/usr/local/mysql所以尽量安装在这里.
xz -d mysql-8.0.23-linux-glibc2.12-x86_64.tar.xz
得到tar文件,还需要在tar -xvf解压才得到目录.将目录重命名为mysql
新建一个/etc/my.cnf
[mysqld]
basedir = /usr/local/mysql
datadir = /usr/local/mysql/data
port = 3306
socket = /usr/local/mysql/data/mysql.sock
user=root
character-set-server=utf8mb4
log-error = /usr/local/mysql/data/mysqld.log
pid-file = /usr/local/mysql/data/mysqld.pid
[client]
default-character-set=utf8mb4
socket=/usr/local/mysql/data/mysql.sock
[mysql]
default-character-set=utf8mb4
socket=/usr/local/mysql/data/mysql.sock
回到mysql的目录执行(注意这个--user一定要指定一个用户,否则不会生成user表,启动server会失败)
./bin/mysqld --initialize --user=root --basedir=/usr/local/mysql/ --datadir=/usr/local/mysql/data
./support-files/mysql.server start
cat data/mysqld.logs找到生成的临时密码
./bin mysql -uroot -p刚才的临时密码即可登陆.需要先改密码
alter user 'root'@'localhost' identified by '123456'
创建一个hive用户,准备用它来连接mysql
create user 'hive'@'%' identified by 'hive';
GRANT ALL PRIVILEGES ON . TO 'hive'@'%';
flush privileges;
alter user 'hive'@'%' identified WITH mysql_native_password by '123456'
wget http://mirror.bit.edu.cn/apache/hive/hive-3.1.2/apache-hive-3.1.2-bin.tar.gz
tar -zxvf apache-hive-3.1.2-bin.tar.gz
mv apache-hive-3.1.2-bin hive-3.1.2
/etc/profile添加配置
export HIVE_HOME=/home/tangsong.math/repos/hive-3.1.2
export PATH=HIVE_HOME/bin
我们需要将mysql驱动jar拷到hive的lib目录下.可以去maven上下一个.
修改hive/conf目录下
cp hive-env.sh.template hive-env.sh
vim hive-env.sh
添加下面内容
HADOOP_HOME=/home/tangsong.math/repos/hadoop-3.2.1
export HIVE_CONF_DIR=/home/tangsong.math/repos/hive-3.1.2/conf
export HIVE_AUX_JARS_PATH=/home/tangsong.math/repos/hive-3.1.2/lib
conf目录下新建hive-site.xml
在hive/bin下执行./schematool -dbType mysql -initSchema
如果报错
java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
是因为用了google的guava并且在hadoop中的版本与hive中的版本不一致导致.将hadoop/share/common/lib中的高版本复制过来覆盖hive/lib下的低版本
新建conf/hive-site.xml如下.注意在mysql connector 6或以上版本驱动类为com.mysql.cj.jdbc.Driver而不再是com.mysql.jdbc.Driver
<configuration> <property><name>javax.jdo.option.ConnectionURL</name><value>jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true&serverTimezone=Asia/Shanghai</value></property> <property><name>javax.jdo.option.ConnectionDriverName</name><value>com.mysql.cj.jdbc.Driver</value></property> <property><name>javax.jdo.option.ConnectionUserName</name><value>hive</value></property> <property><name>javax.jdo.option.ConnectionPassword</name><value>123456</value></property> <property><name>datanucleus.readOnlyDatastore</name><value>false</value></property> <property><name>datanucleus.fixedDatastore</name><value>false</value></property> <property><name>datanucleus.autoCreateSchema</name><value>true</value></property> <property><name>datanucleus.schema.autoCreateAll</name><value>true</value></property> <property><name>datanucleus.autoCreateTables</name><value>true</value> </property><property><name>datanucleus.autoCreateColumns</name><value>true</value></property> <property><name>hive.metastore.local</name><value>true</value></property> <property><name>hive.cli.print.header</name><value>true</value></property> <property><name>hive.cli.print.current.db</name><value>true</value></property> </configuration>hive3不再支持MR,需要用tez(有个坑,网上有教程用0.8.5版本会因为0.8.5基于hadoop2.6,在FileInputFormat类中调用了Google的guava里面StopWatch的public构造函数,但是新版guava这个类的构造函数变成protected了所以会报IllagalAccessException.
wget https://mirrors.tuna.tsinghua.edu.cn/apache/tez/0.9.0/apache-tez-0.9.0-bin.tar.gz
tar -zxvf apache-tez-0.9.0-bin.tar.gz
初始化Hive
bin/schematool -dbType mysql -initSchema
hadoop的配置文件增加内容,否则启动beeline可能会报下面错误
Error: Could not open client transport with JDBC Uri: jdbc:hive2://localhost:10000: Failed to open new session: java.lang.RuntimeException: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.authorize.AuthorizationException): User: root is not allowed to impersonate root (state=08S01,code=0)
hdfs-site.xml
<property><name>dfs.webhdfs.enabled</name><value>true</value></property>
core-site.xml
<property><name>hadoop.proxyuser.root.hosts</name><value>*</value></property>
<property><name>hadoop.proxyuser.root.groups</name><value>*</value></property>
启动hive
nohup bin/hive --service metastore 2>&1 &
关闭hive只能ps -ef |grep hive找到pid然后kill掉.
或者
bin/hive --service hiveserver2 &
另一种是直接./hive
退出可以quit;
现在可以启动beeline
bin/beeline -u jdbc:hive2://localhost:10000 -n root
退出beeline可以执行!exit(注意有个叹号)
配置hiveserver2的webui页面.hive-site.xml添加配置
<property><name>hive.server2.webui.host</name><value>n227-026-077</value></property>
<property><name>hive.server2.webui.port</name><value>10002</value></property>
打开下面地址即可
http://10.227.26.77:10002/
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemalocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelversion>4.0.0</modelversion></project>
<groupId>org.example</groupId> <artifactId>untitled3</artifactId> <version>1.0-SNAPSHOT</version> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.2.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs-client --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs-client</artifactId> <version>3.2.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.7.0</version> <configuration> <source>1.8</source> <target>1.8</target> <!-- Disable annotation processing for ourselves.--> <compilerArgument>-proc:none</compilerArgument> </configuration> </plugin> <plugin> <groupId>com.coveo</groupId> <artifactId>fmt-maven-plugin</artifactId> <version>2.5.1</version> <executions> <execution> <goals> <goal>check</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <configuration> <archive> <!--放置生成的文件覆盖我们自定义的文件--> <addMavenDescriptor>false</addMavenDescriptor> </archive> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"></transformer> </transformers><createDependencyReducedPom>false</createDependencyReducedPom> </configuration> </execution> </executions> </plugin> </plugins> </build>
注意在hive中,insert语句必须是 insert into table table_name
或者insert overwrite table table_name
在mysql中是不需要这个'table'的,直接跟表名即可.
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class Test {
public void loadLocalFile(String path) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://10.227.26.77:9000");
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
System.getProperties().setProperty("HADOOP_USER_NAME", "root");
FileSystem fs = FileSystem.get(conf);
String inputPath = "/input";
fs.mkdirs(new Path(inputPath));
fs.copyFromLocalFile(new Path(path), new Path(inputPath));
fs.close();
}
public void executeSql(String sql) throws Exception {
String driverName = "org.apache.hive.jdbc.HiveDriver"; String url = "jdbc:hive2://n227-026-077:10000"; Connection conn = null; Class.forName(driverName); conn = DriverManager.getConnection(url, "root", ""); PreparedStatement stmt = conn.prepareStatement(sql); stmt.execute();
}
@org.junit.Test
public void test1() throws Exception {
String driverName = "org.apache.hive.jdbc.HiveDriver"; String url = "jdbc:hive2://n227-026-077:10000/hive_test"; Connection conn = null; ResultSet rs = null; Class.forName(driverName); conn = DriverManager.getConnection(url, "root", ""); String sql = "create external table total_cost(id int,name string,cost int) partitioned by (p_date string) row format delimited fields terminated by ','"; String pDate = ZonedDateTime.now() .plusDays(2) .format(DateTimeFormatter.ofPattern("yyyyMMdd").withZone(ZoneId.systemDefault())); // String sql=String.format("load data inpath '/input/total_cost_3.txt' overwrite into table // total_cost partition (p_date=%s)",pDate); PreparedStatement stmt = conn.prepareStatement(sql); stmt.executeUpdate();
}
@org.junit.Test
public void test2() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://10.227.26.77:9000");
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
System.getProperties().setProperty("HADOOP_USER_NAME", "root"); FileSystem fs = FileSystem.get(conf); String inputPath = "/input"; fs.mkdirs(new Path(inputPath)); fs.copyFromLocalFile(new Path("/Users/bytedance/repos/total_cost_3.txt"), new Path(inputPath)); fs.close(); String driverName = "org.apache.hive.jdbc.HiveDriver"; String url = "jdbc:hive2://n227-026-077:10000/hive_test"; Connection conn = null; ResultSet rs = null; Class.forName(driverName); conn = DriverManager.getConnection(url, "root", ""); // String sql = "create external table total_cost(id int,name string,cost int) partitioned by // (p_date string) row format delimited fields terminated by ','"; String pDate = ZonedDateTime.now() .plusDays(2) .format(DateTimeFormatter.ofPattern("yyyyMMdd").withZone(ZoneId.systemDefault())); String sql = String.format( "load data inpath '/input/total_cost_3.txt' overwrite into table total_cost partition (p_date=%s)", pDate); PreparedStatement stmt = conn.prepareStatement(sql); stmt.executeUpdate();
}
@org.junit.Test
public void test3() throws Exception {
loadLocalFile("/Users/bytedance/Downloads/query_result.tsv"); executeSql("create database if not exists analysis "); executeSql("create external table if not exists analysis.key_date (id int,name string,start_date string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'"); executeSql("load data inpath '/input/query_result.tsv' overwrite into table analysis.key_date");
}
//测试orc格式存储.不能直接用文本文件向orc表load数据.必须从其它表写入.
//hive 的insert必须要table然后才是table_name.mysql不需要这个table.另外insert的时候必须给所有列都填上,不能选择若干列insert
@org.junit.Test
public void test4() throws Exception {
//loadLocalFile("/Users/bytedance/Downloads/query_result.tsv");
//executeSql("create external table if not exists analysis.key_date1 (id int,name string,start_date string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED as ORC TBLPROPERTIES ("orc.compress"="ZLIB", "orc.create.index"='true') ");
executeSql("insert overwrite table analysis.key_date1 select id, name, start_date from analysis.key_date where id<=35 ");
}
}
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
public class HiveTest {
public static void main(String[] args) throws Exception {
String driverName = "org.apache.hive.jdbc.HiveDriver"; String url = "jdbc:hive2://n227-026-077:10000/analysis"; Connection conn = null; ResultSet rs = null; Class.forName(driverName); conn = DriverManager.getConnection(url, "root", ""); String sql = "select* from analysis.key_date1"; PreparedStatement stmt = conn.prepareStatement(sql); rs = stmt.executeQuery(); List<CompassAnalysisKeyDate> list = new ArrayList<>(); while (rs.next()) { CompassAnalysisKeyDate compassAnalysisKeyDate = new CompassAnalysisKeyDate(); compassAnalysisKeyDate.setId(rs.getString("id")); compassAnalysisKeyDate.setName(rs.getString("name")); compassAnalysisKeyDate.setStartDate(rs.getString("start_date")); list.add(compassAnalysisKeyDate); } list.forEach(System.out::println);
}
public static class CompassAnalysisKeyDate {
private String id;
private String name;
private String startDate;
@Override public String toString() { return "CompassAnalysisKeyDate{" + "id='" + id + '\'' + ", name='" + name + '\'' + ", startDate='" + startDate + '\'' + '}'; } public void setId(String id) { this.id = id; } public void setName(String name) { this.name = name; } public void setStartDate(String startDate) { this.startDate = startDate; }
}
}
下面我们来看一下复合类型.strcut,array,map
在建表的时候可以指定集合类型分隔符.
collection items terminated by '-';
map keys terminated by ':' ;
假设我们有一张表如下
create external table analysis.game_info (id int, country_and_city structcountry:string,city:string,category array<string>, extra_info map<string,string>)
row format delimited fields terminated by '\t' collection items terminated by ','
map keys terminated by ':' </string>
数据准备如下(似乎vs code会对制表符进行一些处理,所以这次我们显式地生成文件内容
#include <stdlib.h>
#include <stdio.h>
int main(){
FILE* fp=fopen("/Users/bytedance/Downloads/query_result.tsv","w");
fprintf(fp,"1\tUS,NewYork\tcategory1,category2\tstorage:100,price:200\n");
fprintf(fp,"2\tCN,Shanghai\tcategory1,category3\tstorage:120,price:150");
return 0;
}
读取表的metaData,注意columnIndex从1开始.
@org.junit.Test
public void test6() throws Exception {
String driverName = "org.apache.hive.jdbc.HiveDriver";
String url = "jdbc:hive2://n227-026-077:10000/analysis";
Connection conn = null; ResultSet rs = null; Class.forName(driverName); conn = DriverManager.getConnection(url, "root", ""); String sql = "select* from analysis.game_info limit 1"; PreparedStatement stmt = conn.prepareStatement(sql); rs = stmt.executeQuery(); System.out.println("column count "+rs.getMetaData().getColumnCount()); for(int i=0;i<rs.getMetaData().getColumnCount();i++) System.out.println(rs.getMetaData().getColumnTypeName(i+1)+" "+rs.getMetaData().getColumnName(i+1));
}
输出
column count 4
int game_info.id
struct game_info.country_and_city
array game_info.category
map game_info.extra_info
由于标准的jdbc中并没有集合类型,所以不能按照集合读取但是可以按照String读取
@org.junit.Test
public void test7() throws Exception {
String driverName = "org.apache.hive.jdbc.HiveDriver";
String url = "jdbc:hive2://n227-026-077:10000/analysis";
Connection conn = null; ResultSet rs = null; Class.forName(driverName); conn = DriverManager.getConnection(url, "root", ""); String sql = "select* from analysis.game_info "; PreparedStatement stmt = conn.prepareStatement(sql); rs = stmt.executeQuery(); System.out.println("column count "+rs.getMetaData().getColumnCount()); while(rs.next()){ Integer id=rs.getInt(1); String countryAndCity=rs.getString(2); String category=rs.getString(3); String extra=rs.getString(4); System.out.println(id+" "+countryAndCity+" "+category+" "+extra); }
}
1 {"country":"US","city":"NewYork"} ["category1","category2"] {"storage":"100","price":"200"}
2 {"country":"CN","city":"Shanghai"} ["category1","category3"] {"storage":"120","price":"150"}
从复合类型中读取指定字段
select id,country_and_city.country,category[0],extra_info from analysis.game_info
再来看一下经典的lateral view explode
explode可以将array或者map字段拆开(但是不能拆struct)
select explode(category) from game_info;
explode的结果可以认为是一个虚拟的表,因此我们需要给它命个名才能与其它字段一起用
select id,tmp_field_name from game_info lateral view explode(category) tmp_table_name as tmp_field_name
分区表与窗口函数
create external table analysis.revenue (region string,country string, cost int) partitioned by(p_date string) row format delimited fields terminated by '\t'
造数
#include <stdlib.h>
#include <stdio.h>
int main()
{
FILE *fp = fopen("/Users/bytedance/Downloads/query_result20200101.tsv", "w"); fprintf(fp, "NA\tUS\t80\n"); fprintf(fp, "ASIA\tCN\t210\n"); fprintf(fp, "ASIA\tJP\t60\n"); fp = fopen("/Users/bytedance/Downloads/query_result20200102.tsv", "w"); fprintf(fp, "NA\tUS\t100\n"); fprintf(fp, "NA\tCA\t20\n"); fprintf(fp, "ASIA\tCN\t200\n"); fprintf(fp, "ASIA\tJP\t50\n"); fprintf(fp, "EU\tFR\t50\n"); return 0;
}
分区表load数据的时候必须指定分区
loadLocalFile("/Users/bytedance/Downloads/query_result20200101.tsv");
loadLocalFile("/Users/bytedance/Downloads/query_result20200102.tsv");
executeSql("create external table analysis.revenue (region string,country string, cost int) partitioned by(p_date string) row format delimited fields terminated by '\t'");
executeSql("load data inpath '/input/query_result20200101.tsv' overwrite into table analysis.revenue partition(p_date=20200101)");
executeSql("load data inpath '/input/query_result20200102.tsv' overwrite into table analysis.revenue partition(p_date=20200102)");
假设我们要计算分region的同比.
窗口函数
window_function(column_name) over(partition by column_name order by column_name_1)as column_alias
后一个括号中的内容称为window子句.窗口函数可以不带参数或者带更多参数.
经典窗口函数row_number(),rank(),dense_rank()
根据order by的字段,如果有重复则走到下一个不重复的值的时候rank()=row_number()但是dense_rank()则是上一个值+1(所以称为dense)
select revenue., row_number() over (partition by region order by cost desc) from analysis.revenue
这样行号相当于本partition起始位置是1.当下一行是一个新的partition(本例就是不同的region)则row_number重置为1.由于外部我们没有按照任何字段排序,就按照窗口函数内的先对region排序,region相同按照cost desc排序.回来的结果同region的row_number依次递增
如果我们在外层指定按照cost排序,回来的结果中row_number仍然不会变,但是各行顺序不同.
select revenue., row_number() over (partition by region order by cost desc) from analysis.revenue order by cost desc;
如果窗口函数是求和,则值为从本partition起始位置到当前位置的和.
select revenue.*, sum(cost) over (partition by region order by cost desc) from analysis.revenue;
也可以限制求和范围
order by cost ROWS BETWEEN 3 PRECEDING AND CURRENT ROW
order by cost ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING
lead,lag返回指定位置的值.可以设定默认值,没有默认值则没值的时候为NULL
select revenue., lead(cost,1) over (partition by region order by cost desc) from analysis.revenue order by cost desc;
这样每一行就把下一行的cost填到了这个窗口字段上。如果下一行的region不同会导致这个字段(默认名称lead_window_0)为NULL
select revenue., lead(cost,1,999) over (partition by region order by cost desc) as tmp_field from analysis.revenue;
现在我们来看之前说的分region计算同比.
首先把每个region每天的值算出来.
select sum(cost) as cost_sum, region,p_date from analysis.revenue group by region,p_date order by region,p_date desc;
由于p_date是降序排列的根据字典序则较近的日期在前.所以我们要用lead函数
select region,p_date,cost_sum,lead(cost_sum,1,0) over(partition by region order by p_date desc) as yesterday_cost from(select sum(cost) as cost_sum, region,p_date from analysis.revenue group by region,p_date order by region,p_date desc) tmp_table;
外面再套一层算同比.这里可以把NULL(sql中1/0或者0/0或者0.0/0.0会得到NULL)处理为0
UDF开发
需要增加一个依赖(理论上版本应该与hive一致,但是我们发现虽然有3.2.1版本的hive但是没有3.2.1版本的hive-exec)
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>3.1.2</version> </dependency>
UDF要继承下面2个类中的一个
简单的UDF继承
org.apache.hadoop.hive.ql.exec.UDF
复杂的UDF(希望操作Map,Struct等内置结构)继承
org.apache.hadoop.hive.ql.udf.generic.GenericUDF
设有一个简单UDF类如下
package com.ts;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public class TSUDF extends UDF{
public Text evaluate(Text input) { if(input == null) return null; return new Text("Hello " + input.toString()); } public int evaluate(int a,int b){return a+b;}
}
我们需要实现一些evaluate函数,参数可以是Java基础类型或者hadoop的Text类(表示字符串)
maven打个包,假设叫original-untitled3-1.0-SNAPSHOT.jar(不需要打包依赖)
将它拷到hive安装的机器上
打开beeline终端执行
add jar /home/tangsong.math/repos/original-untitled3-1.0-SNAPSHOT.jar;
create temporary function ts_demo as 'com.ts.TSUDF';
as后面是我们的UDF类的全类名.这样就可以使用函数了,名字叫ts_demo
select ts_demo('abc');
select ts_demo(1,2);
如果参数不是1个字符串或者2个整数则会报错
0: jdbc:hive2://localhost:10000> select ts_demo(1,2,3);
Error: Error while compiling statement: FAILED: SemanticException [Error 10014]: Line 1:7 Wrong arguments '3': No matching method for class com.ts.TSUDF with (int, int, int). Possible choices: FUNC(int, int) FUNC(string) (state=42000,code=10014)
由于我们用的是临时函数如果退出beeline再进这个函数就没了.
事实上临时函数属于session.重新打开一个session需要从add jar那一步开始执行导入函数
要删掉临时函数可以
DROP TEMPORARY FUNCTION IF EXISTS ts_demo;
创建临时函数
CREATE TEMPORARY FUNCTION <函数名> AS <class全路径>;
删除临时函数
DROP TEMPORARY FUNCTION [IF EXISTS] <函数名>;
创建永久函数
CREATE FUNCTION [db_name.]function_name AS class_name
[USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ];
删除永久函数
DROP FUNCTION [IF EXISTS] function_name;
测试了一下从本地文件系统创建永久函数
CREATE FUNCTION ts_demo AS 'com.ts.TSUDF'
USING JAR 'file:///home/tangsong.math/repos/original-untitled3-1.0-SNAPSHOT.jar';
报错
Error: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.FunctionTask. Hive warehouse is non-local, but file:///home/tangsong.math/repos/original-untitled3-1.0-SNAPSHOT.jar specifies file on local filesystem. Resources on non-local warehouse should specify a non-local scheme/path (state=08S01,code=1)
尝试将jar上传到hdfs
将jar重命名为hiveUDF1.jar
在hadoop的bin目录执行
./hadoop fs -put /home/tangsong.math/repos/hiveUDF1.jar /input
CREATE FUNCTION ts_demo AS 'com.ts.TSUDF'
USING JAR 'hdfs://10.227.26.77:9000/input/hiveUDF1.jar';
即可加入永久函数.
假设立刻在hdfs中删除这个jar
在beeline会话中函数不会立刻失效.但是如果退出再进入beeline再使用这个ts_demo函数会提示找不到函数
drop function会打印一个ERROR日志但是执行成功
show functions like 'ts*'
确定函数已经被删除
我们继承的这个UDF类在较新版本中实际上已经被标注为Deprecated
要实现一个更强的UDF应该继承org.apache.hadoop.hive.ql.udf.generic.GenericUDF
假设我们的函数的作用是删除一个Map<String,?>中指定的一些key返回更新后的Map
用法为
ts_demo(map,key1,key2,...)
在hive中可以用desc function function_name来查看函数使用帮助
但是对于我们这个自定义的函数会显示
There is no documentation for function 'ts_demo'
要解决这个问题需要给我们的UDF加上org.apache.hadoop.hive.ql.exec.Description注解
这个注解有3个String成员value,extended,name
name:用于指定Hive中的函数名。
value:用于描述函数的参数。
extended:额外的说明,如,给出示例。当使用DESCRIBE FUNCTION EXTENDED name的时候打印。
GenericUDF是一个抽象类,继承它需要重写下面的3个成员函数
@Override
public ObjectInspector initialize(ObjectInspector[] objectInspectors)
throws UDFArgumentException {
return null;
}
@Override
public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
return null;
}
@Override
public String getDisplayString(String[] strings) {
return null;
}
注意我们的UDF如果声明了成员变量一般为transient的
这个关键字保证序列化的时候成员变量不会被序列化.
initialize(ObjectInspector[] arguments)
initialize函数需要注意一点是其返回值要和你的扩展函数最终返回值保持一致;另外用户输入参数的合法性检查主要 也是在这里进行
evaluate(GenericUDF.DeferredObject[] arguments)
你的扩展函数逻辑主要在这里实现
getDisplayString(String[] children)
里面写一些介绍性信息,在用户对sql语句进行explain的时候显示。用膝盖也能想到这和@Description里的内容是在不同场合展示的,后者在用户使用desc function命令的时候显示函数介绍
一般写成
return getStandardDisplayString("your_function_desc", strings);
先来讲讲ObjectInspector这个类.它封装了参数的类型用于检查.
假如我们要校验函数参数个数为2
if (arguments.length != 1) {
throw new UDFArgumentLengthException("function ts_demo must take 2 args");
}
假设我们要校验首个参数类型为String
ObjectInspector是个接口,它有一个内部枚举类Category
public interface ObjectInspector extends Cloneable {
String getTypeName();
ObjectInspector.Category getCategory(); public static enum Category { PRIMITIVE, LIST, MAP, STRUCT, UNION; private Category() { } }
}
要判断参数0的类型可以
arguments[0].getCategory
也可以更粗暴的用instanceof
if (!(arguments[0] instanceof MapObjectInspector)) {
throw new UDFArgumentTypeException(0,
"map_sort_to_json: the arg need map value, but found " + arguments[0]
.getTypeName());
}
mapOI = (MapObjectInspector) arguments[0];
if (!(mapOI.getMapKeyObjectInspector() instanceof StringObjectInspector)) {
throw new UDFArgumentTypeException(0,
"map_sort_to_json: the first arg must defined with map<string, ?>, but found map<"
+ mapOI.getMapKeyObjectInspector().getTypeName() + ", ?>");
}
return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
evaluate函数需要处理Hive数据类型.要从Hive数据类型中提取对应的java类型,需要ObjectInspector与GenericUDF.DeferredObject一起使用
public interface DeferredObject { void prepare(int var1) throws HiveException; Object get() throws HiveException; }
将ObjectInspector类的对象传给get函数获取Java类型.
Java连接hive metastore
<dependency>
<groupid>org.apache.hive</groupid>
<artifactid>hive-metastore</artifactid>
<version>1.2.2</version>
</dependency>
关于多级分区表.假设分区表如下
create table if not exists analysis.partition_demo (id int, name string) partitioned by (year string,month string) row format delimited fields terminated by '\t' collection items terminated by ',' map keys terminated by ':'
我们mock几条数据
insert into table analysis.partition_demo partition(year='2020',month='01') values(1,'zhuzhu');
insert into table analysis.partition_demo partition(year='2021',month='02') values(1,'tangsong');
下面准备来实现wait_until_hive_partition_ready
考虑到多级分区表实际上是用多级目录来存储的所以可以用/来分割多级目录
partition(year='2020',month='01')存在就相当于存在子目录spec为year=2020/month=01
@org.junit.Test
public void test10() throws Exception {
HiveConf hiveConf = new HiveConf(); hiveConf.set("hive.metastore.uris", "thrift://10.227.26.77:9083"); HiveMetaStoreClient hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf); List<String> tablesList = hiveMetaStoreClient.getAllTables("analysis"); tablesList.forEach(System.out::println); Table table = hiveMetaStoreClient.getTable("analysis", "partition_demo"); List<FieldSchema> fieldSchemaList = table.getSd().getCols(); for (FieldSchema fieldSchema : fieldSchemaList) { System.out.println(fieldSchema.getName()); System.out.println(fieldSchema.getType()); } // 实现wait_until_hive_partition_ready的功能 Partition partition = hiveMetaStoreClient.getPartition("analysis", "partition_demo", "year=2020/month=01"); System.out.println(partition); partition = hiveMetaStoreClient.getPartition("analysis", "partition_demo", "year=2020/month=0123"); System.out.println(partition);
}
getPartition在分区不存在的时候会抛出NoSuchObjectException