JDBC/ODBC

Spark SQL也提供JDBC连接支持。JDBC服务器作为一个独立的Spark驱动器程序运行,可以在多用户之间共享。任何一个客户端都可以在内存中缓存数据表,对表进行查询。集群的资源和缓存的数据都在所用用户之间共享。
Spark SQL的JBDC服务器与Hive中的HiveServer2相一致。由于使用了Thrift通信协议,它也被称为“Thrift server”。注意,JDBC服务器支持需要Spark在打开Hive支持的选项下编译。Spark也自带了Beeline客户端程序,可以使用它连接JDBC服务器。

**当启动JDBC服务器时,JDBC服务器会在后台运行并且将所有输出重定向到一个日志文件中。如果使用JDBC服务器进行查询过程中遇到了问题,可以查看日志寻找更为完整的报错信息。

许多外部工具也可以通过ODBC连接到Spark SQL。Spark SQL的ODBC服务器驱动由Simba(http://www.simba.com/)制作,可以在很多Spark供应商处下载到。它经常会被像Microstrategy或Tableau这样的商务智能工具所用到。由于Spark SQL使用了和Hive相同的查询语言和服务器,大多数可以连接到Hive的商务智能工具也可以通过已有的Hive连接器来连接到Spark SQL上。

使用Beeline

在Beeline客户端中,你可以使用标准的HiveQL命令来创建,列举以及查询数据表。可以从Hive语言手册(http://cwiki.apache.org/confluence/display/Hive/LanguageManual)中找到关于HiveQL的所有语法细节,这里展示一些常见的操作。
使用CREATE TABLE命令从本地创建一张数据表,然后使用LOAD DATA命令读取数据。Hive支持读取带有固定分隔符的文本文件,比如CSV等格式的文件。
要列举数据表,可以使用SHOW TABLES语句,也可以通过DESCRIBE tableName查看每张表的结构信息。
如果想缓存数据表,使用CACHE TABLE tableName语句。缓存之后可以使用UNCACHE TABLE tableName命令取消对表的缓存。需要注意,缓存的表会在这个JDBC服务器上的所有客户端之间共享。
在Beeline中查看查询计划很简单,对查询语句运行EXPLAIN即可。

用户自定义函数

用户自定义函数(UDF),在Spark SQL中,编写UDF尤为简单。Spark SQL不仅有自己的UDF接口,也支持已有的Apache Hive UDF。

Spark SQL UDF

我们可以使用Spark支持的编程语言编写号函数,然后通过Spark SQL内建方法传递进来,非常便捷的注册我们自己的UDF。在Scala可Python中,可以利用语言原生的函数和lambda语法的支持,而在Java中,则需要扩展对应的UDF类。UDF能够支持各种数据类型,返回类型也可以与调用时的参数类型完全不一样。
在Python和Scala中,还需要用SchemaRDD对应的类型来指定返回值类型。Java中的对应类型可以在org.apache.spark.sql.api.java.DataType中找到,而在Python中则需要导入DataType支持。
例:Python版本的计算字符串长度UDF
#写一个求字符串长度的UDF
hiveCtx.registerFunction("strLenPython",lambda X: len(x), IntegerType())
lengthSchemaRDD = hiveCtx.sql("SELECT strLenPython('text') FROM tweets LIMIT 10")
在Java中定义UDF需要一些额外的import声明。和在定义RDD函数时一样,根据我们要实现的UDF的参数个数,需要扩展特定的类。
例:Java UDF import声明
//导入UDF函数类以及数据类型
//注意:这些import路径可能会在将来的发行版中改变
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;
例:Java版本的字符串长度UDF
hiveCtx.udf().register("stringLengthJava", new UDF1<String, Integer>(){
    @Override
    public Integer call(String str) throws Exception {
        return str.length();
    }
},DataTypes.IntegerType);
SchemaRDD tweetLength = hiveCtx.sql(
"SELECT stringLengthJava('text') FROM tweets LIMIT 10");
List<Row> lengths = tweetLength.collect();
for(Row row : result) {
    System.out.println(row.get(0));
}