JDBC/ODBC
Spark SQL也提供JDBC连接支持。JDBC服务器作为一个独立的Spark驱动器程序运行,可以在多用户之间共享。任何一个客户端都可以在内存中缓存数据表,对表进行查询。集群的资源和缓存的数据都在所用用户之间共享。
Spark SQL的JBDC服务器与Hive中的HiveServer2相一致。由于使用了Thrift通信协议,它也被称为“Thrift server”。注意,JDBC服务器支持需要Spark在打开Hive支持的选项下编译。Spark也自带了Beeline客户端程序,可以使用它连接JDBC服务器。
**当启动JDBC服务器时,JDBC服务器会在后台运行并且将所有输出重定向到一个日志文件中。如果使用JDBC服务器进行查询过程中遇到了问题,可以查看日志寻找更为完整的报错信息。
许多外部工具也可以通过ODBC连接到Spark SQL。Spark SQL的ODBC服务器驱动由Simba(http://www.simba.com/)制作,可以在很多Spark供应商处下载到。它经常会被像Microstrategy或Tableau这样的商务智能工具所用到。由于Spark SQL使用了和Hive相同的查询语言和服务器,大多数可以连接到Hive的商务智能工具也可以通过已有的Hive连接器来连接到Spark SQL上。
使用Beeline
在Beeline客户端中,你可以使用标准的HiveQL命令来创建,列举以及查询数据表。可以从Hive语言手册(http://cwiki.apache.org/confluence/display/Hive/LanguageManual)中找到关于HiveQL的所有语法细节,这里展示一些常见的操作。
使用CREATE TABLE命令从本地创建一张数据表,然后使用LOAD DATA命令读取数据。Hive支持读取带有固定分隔符的文本文件,比如CSV等格式的文件。
要列举数据表,可以使用SHOW TABLES语句,也可以通过DESCRIBE tableName查看每张表的结构信息。
如果想缓存数据表,使用CACHE TABLE tableName语句。缓存之后可以使用UNCACHE TABLE tableName命令取消对表的缓存。需要注意,缓存的表会在这个JDBC服务器上的所有客户端之间共享。
在Beeline中查看查询计划很简单,对查询语句运行EXPLAIN即可。
用户自定义函数
用户自定义函数(UDF),在Spark SQL中,编写UDF尤为简单。Spark SQL不仅有自己的UDF接口,也支持已有的Apache Hive UDF。
Spark SQL UDF
我们可以使用Spark支持的编程语言编写号函数,然后通过Spark SQL内建方法传递进来,非常便捷的注册我们自己的UDF。在Scala可Python中,可以利用语言原生的函数和lambda语法的支持,而在Java中,则需要扩展对应的UDF类。UDF能够支持各种数据类型,返回类型也可以与调用时的参数类型完全不一样。
在Python和Scala中,还需要用SchemaRDD对应的类型来指定返回值类型。Java中的对应类型可以在org.apache.spark.sql.api.java.DataType中找到,而在Python中则需要导入DataType支持。
例:Python版本的计算字符串长度UDF
#写一个求字符串长度的UDF hiveCtx.registerFunction("strLenPython",lambda X: len(x), IntegerType()) lengthSchemaRDD = hiveCtx.sql("SELECT strLenPython('text') FROM tweets LIMIT 10")
在Java中定义UDF需要一些额外的import声明。和在定义RDD函数时一样,根据我们要实现的UDF的参数个数,需要扩展特定的类。
例:Java UDF import声明
//导入UDF函数类以及数据类型 //注意:这些import路径可能会在将来的发行版中改变 import org.apache.spark.sql.api.java.UDF1; import org.apache.spark.sql.types.DataTypes;例:Java版本的字符串长度UDF
hiveCtx.udf().register("stringLengthJava", new UDF1<String, Integer>(){ @Override public Integer call(String str) throws Exception { return str.length(); } },DataTypes.IntegerType); SchemaRDD tweetLength = hiveCtx.sql( "SELECT stringLengthJava('text') FROM tweets LIMIT 10"); List<Row> lengths = tweetLength.collect(); for(Row row : result) { System.out.println(row.get(0)); }