1、产生背景

每一个Spark应用程序往往都会以加载一个数据源开始,保存数据结束。为了让用户可以方便地从不同的数据源(json、parquet、rdbms),经过混合处理(json join parquet)再将处理结果以特定的格式(json、parquet)写回到指定的系统(HDFS、S3等)上去,SparkSQL1.2引入了外部数据源API。

2、操作Parquet数据

Parquet是面向分析型业务的列式存储格式,由Twitter和Cloudera合作开发,2015年5月从Apache的孵化器里毕业成为Apache顶级项目。Parquet仅仅是一种存储格式,它是语言、平台无关的,并且不需要和任何一种数据处理框架绑定,目前能够和Parquet适配的组件包括下面这些,可以看出基本上通常使用的查询引擎和计算框架都已适配,并且可以很方便的将其它序列化工具生成的数据转换成Parquet格式。

  • 查询引擎: Hive, Impala, Pig, Presto, Drill, Tajo, HAWQ, IBM Big SQL
  • 计算框架: MapReduce, Spark, Cascading, Crunch, Scalding, Kite
  • 数据模型: Avro, Thrift, Protocol Buffers, POJOs

读取Parquet数据:spark.read.format("parquet").load(path)

写入Parquet数据:dataframe.format("parquet").save(path)

import org.apache.spark.sql.SparkSession

/**
  * @author YuZhansheng
  * @desc Parquet文件操作
  * @create 2019-03-06 17:08
  */
object ParquetAPP {
    def main(args: Array[String]): Unit = {

        val spark = SparkSession.builder().appName("ParquetAPP").master("local[2]").getOrCreate()

        val path = "file:/soft/spark/examples/src/main/resources/users.parquet"

        //加载Parquet文件为DataFrame:这是标准写法,也可以不指定format,如userDF2
        //但是,如果是parquet以外的其他数据源,比如json,不指定format会报错,就不能使用userDF2的方式了
        //SparkSQL默认处理的format就是parquet!!!
        val userDF = spark.read.format("parquet").load(path)
        val userDF2 = spark.read.load(path)

        //读取parquet文件还可以使用如下的写法:
        spark.read.format("parquet").option("path",path).load().show()

        userDF.printSchema()
        userDF.show()
        userDF2.show()

        //把parquet写成json
        userDF.select("name","favorite_color").write.format("json").save("/root/DataSet/jsonout")

        spark.stop()
    }
}


        //加载Parquet文件为DataFrame:这是标准写法,也可以不指定format,如userDF2
        //但是,如果是parquet以外的其他数据源,比如json,不指定format会报错,就不能使用userDF2的方式了
        //SparkSQL默认处理的format就是parquet!!!

3、操作Hive表数据

Spark可以通过读取hive的元数据来兼容hive,读取hive的表数据,然后在spark引擎中进行sql统计分析,从而,通过sparksql与hive结合实现数据分析将成为一种最佳实践。Spark-shell操作Hive表中数据需要先启动Hadoop和MySQL。

读:spark.table(tableName)

写:df.write.saveAsTable(tableName)

4、操作MySQL表数据

val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()
val jdbcDF = spark.read.format("jdbc").options(Map("url" -> "jdbc:mysql://hadoop0:3306/hive_remote?user=root&password=18******","dbtable" -> "TBLS","driver" -> "com.mysql.jdbc.Driver")).load()

获取到DataFrame之后,剩下的操作就可以使用SQL语句操作了。

5、综合使用MySQL和Hive


import org.apache.spark.sql.SparkSession

/**
  * @author YuZhansheng
  * @desc  综合使用MySQL和Hive
  * @create 2019-03-07 10:33
  */
object HiveMysqlApp {

    def main(args: Array[String]): Unit = {

        val spark = SparkSession.builder().appName("HiveMysqlApp").master("local[2]").getOrCreate()

        //加载hive表数据
        val hiveDF = spark.table("emp")

        //加载mysql表数据
        val mysqlDF = spark.read.format("jdbc").option("url","jdbc:mysql://hadoop0:3306/hive_remote?user=root&password=123").load()

        //将hive表中数据和mysql表数据做join操作
        val resultDF = hiveDF.join(mysqlDF,hiveDF.col("deptnum") === mysqlDF.col("DEPT"))
        resultDF.show()
        
        spark.stop()
    }
}