读取和存储数据

Spark SQL支持很多种结构化数据源,可以轻松从各种数据源中读取到Row对象。这些数据源包括Hive表、JSON和Parquet文件。此外,当使用SQL查询这些数据源中的数据且只用到部分字段时,Spark SQL可以智能地只扫描这些用到的字段。除此之外,还可以在程序中通过指定结构信息,将常规的RDD转化为SchemaRDD。这使得在Python或Java对象上运行SQL查询更加简单。当需要计算许多数据值时,SQL查询往往更加简洁。不仅如此,还可以自如的将这些RDD和来自其他Spark SQL数据源的SchemaRDD进行连接操作。

Apache Hive

当从Hive中读取数据时,Spark SQL支持任何Hive支持的存储格式(SerDe)。
要把Spark SQL连接到已经部署好的Hive上,需要提供一份Hive配置。只需把hive-site.xml文件复制到Spark的./conf目录下即可。
如果只是想探索下Spark SQL而没有配置文件,那么Spark SQL则会使用本地的HIve元数据仓,且同样可以轻松地将数据读取到Hive表中进行查询。
例:如何查询一张Hive表(此表只有两列,key(int)和value(string)):
#使用Python从Hive中读取
from pyspark.sql import HiveContext

hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("select key, value from mytable")
keys = rows.map(lambda row: row[0])

Parquet

Parquet是一种流行的列式存储格式,可以高效地存储具有嵌套字段记录。Parquet格式经常在Hadoop生态中被使用,它也支持Spark SQL的全部数据类型。Spark SQL提供了直接读取和存储Parquet格式文件的方法。
可以通过HiveContext.parquetFile或者SQLContext.parquetFile来读取数据。
#Python中读取Parquet数据
#从一个有name和favouriteAnimal字段的Parquet文件中读取数据
rows = hiveCtx.parquetFile(parquetFile)
names = rows.map(lambda row: row.name)
print "Everyone"
print names.collect()
也可以把Parquet文件注册为Spark SQL的临时表,并在这张表上运行查询。
#Python中的Parquet数据查询
#寻找熊猫爱好者
tbl = rows.registerTempTable("People")
pandaFriends = hiveCtx.sql("select name from people where favouriteAnimal = \"panda\"")
print "Panda friends"
print pandaFriends.map(lambda row: row.name).collect()
可以使用saveAsParquetFile()把SchemaRDD的内容以Parquet格式保存。

JSON

如果你有一个JSON文件,其中的数据记录遵循同样的结构信息,那么Spark SQL就可以通过扫描文件推测出结构信息,并且让你使用名字访问对应的字段。如果你在一个包含大量JSON文件的目录中进行尝试,就会发现Spark SQL的结构信息推断可以非常高效地操作数据,而无需写专门的代码来读取不同结构的文件。要读取JSON数据,只需要调用hiveCtx中的jsonFile()方法即可。如果想获得从数据中推断出来的结构信息,可以在生成的SchemaRDD上调用printSchema方法。
输入记录:
{"name": "Holden"}
{"name": "Sparky The Bear", "lovesPandas": true,"knows": {"friends":["holden"]}}
在Python中使用Spark SQL读取JSON数据
input = hiveCtx.jsonFile(inputFile)

基于RDD

除了读取数据,也可以基于RDD创建SchemaRDD。在Scala中就,带有case class的RDD可以隐式转换成SchemaRDD。
在Python中,可以创建一个由Row对象组成的RDD,然后调用inferSchema()。
在Python中使用row和具有名元组创建SchemaRDD
happyPeopleRDD = sc.parallelize([Row(name="holden", favouriteBeverage="coffee")])
happyPeopleSchemaRDD = hiveCtx.inferSchema(happyPeopleRDD)
happyPeopleSchemaRDD.registerTempTable("happy_people")
使用Scala的话,隐式转换会帮我们处理好结构信息的判断。
在Scala中基于case class创建SchemaRDD
case class HappyPerson(handle: String, favouriteBeverage: String)
    
//创建了一个人的对象,并且把它转换成SchemaRDD
val happyPeopleRDD = sc.parallelize(List(HappyPerson("holden", "coffee")))
//注意:此处发生了隐式转换
//该转换等价于sqlCtx.createSchemaRDD(happyPeopleRDD)
happyPeopleRDD.registerTempTable("happy_People")