一、安装

基于 mac 操作系统

  1. 安装 jdk
    jdk 下载地址
  2. 安装 pyspark
pip install pyspark

二、读取 HDFS 文件

  1. 读 json
    注意,如果是多行的 json,需要用 “multiLine” 模式,否则会报错
data_path = "./test_file.json"  # 本地
# data_path = "hdfs://..."
df = spark.read.json(data_path)
df = spark.read.option("multiLine", True).option("mode", "PERMISSIVE").json(data_path)
  1. 读 parquet
data_path = "hdfs://..."  
df = spark.read.parquet(data_path)

三、基本操作

2.1 建立SparkSession对象

一切操作之前需要先建立一个SparkSession对象(运行Spark code的Entrance point,可以理解为交互部件):
详见: pyspark.sql module

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("Word Count").config("spark.some.config.option", "some-value").getOrCreate()
# spark = SparkSession.builder.appName('mu').master('local').getOrCreate()

  • 如果遇到如下报错
Traceback (most recent call last):
  File "/Users/my_name/caogao/code_test_1/code_test_pyspark.py", line 5, in <module>
    spark = SparkSession.builder.master("local").getOrCreate()
  File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/pyspark/sql/session.py", line 186, in getOrCreate
    sc = SparkContext.getOrCreate(sparkConf)
  File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/pyspark/context.py", line 376, in getOrCreate
    SparkContext(conf=conf or SparkConf())
  File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/pyspark/context.py", line 136, in __init__
    conf, jsc, profiler_cls)
  File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/pyspark/context.py", line 198, in _do_init
    self._jsc = jsc or self._initialize_context(self._conf._jconf)
  File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/pyspark/context.py", line 315, in _initialize_context
    return self._jvm.JavaSparkContext(jconf)
  File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/py4j/java_gateway.py", line 1569, in __call__
    answer, self._gateway_client, None, self._fqn)
  File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.net.BindException: Can't assign requested address: Service 'sparkDriver' failed after 16 retries (on a random free port)! Consider explicitly setting the appropriate binding address for the service 'sparkDriver' (for example spark.driver.bindAddress for SparkDriver) to the correct binding address.

则在开头添加代码

import pyspark
conf = pyspark.SparkConf().set('spark.driver.host','127.0.0.1')
sc = pyspark.SparkContext(master='local', appName='myAppName',conf=conf)

参考:解决方案

2.2 创建模拟数据表

test = []
test.append((1, 'age', '30', 50, 40))
test.append((1, 'city', 'beijing', 50, 40))
test.append((1, 'gender', 'fale', 50, 40))
test.append((1, 'height', '172cm', 50, 40))
test.append((1, 'weight', '70kg', 50, 40))
test.append((2, 'age', '26', 100, 80))
test.append((2, 'city', 'beijing', 100, 80))
test.append((2, 'gender', 'fale', 100, 80))
test.append((2, 'height', '170cm', 100, 80))
test.append((2, 'weight', '65kg', 100, 80))
test.append((3, 'age', '35', 99, 99))
test.append((3, 'city', 'nanjing', 99, 99))
test.append((3, 'gender', 'female', 99, 99))
test.append((3, 'height', '161cm', 99, 99))
test.append((3, 'weight', '50kg', 99, 99))
df = spark.createDataFrame(test,
						  ['user_id', 'attr_name','attr_value', 'income', 'expenses'])

或者直接

df = spark.createDataFrame([('1', 'Joe', '70000', '1'), ('2', 'Henry', '80000', None)],
                           ['Id', 'Name', 'Sallary', 'DepartmentId'])

2.3 查

2.3.1 行元素查询操作

1. 打印数据

df.show()默认打印前20条数据,当然可以指定具体打印多少条数据。

如果有些属性值特别长,pyspark会截断数据导致打不全,这时候可以使用. df.show(truncate=False)

>>> df.show()
+-------+---------+----------+------+--------+
|user_id|attr_name|attr_value|income|expenses|
+-------+---------+----------+------+--------+
|      1|      age|        30|    50|      40|
|      1|     city|   beijing|    50|      40|
|      1|   gender|      fale|    50|      40|
|      1|   height|     172cm|    50|      40|
|      1|   weight|      70kg|    50|      40|
|      2|      age|        26|   100|      80|
|      2|     city|   beijing|   100|      80|
|      2|   gender|      fale|   100|      80|
|      2|   height|     170cm|   100|      80|
|      2|   weight|      65kg|   100|      80|
|      3|      age|        35|    99|      99|
|      3|     city|   nanjing|    99|      99|
|      3|   gender|    female|    99|      99|
|      3|   height|     161cm|    99|      99|
|      3|   weight|      50kg|    99|      99|
+-------+---------+----------+------+--------+
 
>>> df.show(3)
+-------+---------+----------+------+--------+
|user_id|attr_name|attr_value|income|expenses|
+-------+---------+----------+------+--------+
|      1|      age|        30|    50|      40|
|      1|     city|   beijing|    50|      40|
|      1|   gender|      fale|    50|      40|
+-------+---------+----------+------+--------+
only showing top 3 rows

2. 打印概要

>>> df.printSchema()
root
 |-- user_id: long (nullable = true)
 |-- attr_name: string (nullable = true)
 |-- attr_value: string (nullable = true)
 |-- income: long (nullable = true)
 |-- expenses: long (nullable = true)

3. 查询总行数

>>> df.count()
15

4. 获取头几行到本地

>>> list = df.head(3) 
>>> df.head(3)
[Row(user_id=1, attr_name=u'age', attr_value=u'30', income=50, expenses=40), Row(user_id=1, attr_name=u'city', attr_value=u'beijing', income=50, expenses=40), Row(user_id=1, attr_name=u'gender', attr_value=u'fale', income=50, expenses=40)]
>>> df.take(5)
[Row(user_id=1, attr_name=u'age', attr_value=u'30', income=50, expenses=40), Row(user_id=1, attr_name=u'city', attr_value=u'beijing', income=50, expenses=40), Row(user_id=1, attr_name=u'gender', attr_value=u'fale', income=50, expenses=40), Row(user_id=1, attr_name=u'height', attr_value=u'172cm', income=50, expenses=40), Row(user_id=1, attr_name=u'weight', attr_value=u'70kg', income=50, expenses=40)]

5. 查询某列为null的行

>>> from pyspark.sql.functions import isnull
>>> df = df.filter(isnull("income"))
>>> df.show()
19/02/22 17:05:51 WARN DFSClient: Slow ReadProcessor read fields took 87487ms (threshold=30000ms); ack: seqno: 198 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 17565965 flag: 0 flag: 0 flag: 0, targets: [DatanodeInfoWithStorage[172.21.3.38:50010,DS-82aedc87-a850-40aa-9d04-dc62ab0988ef,DISK], DatanodeInfoWithStorage[172.21.80.165:50010,DS-305daec5-3c77-48cd-bee2-4f839aea8bb4,DISK], DatanodeInfoWithStorage[172.21.151.40:50010,DS-29ba84d5-ad7d-407f-9484-d85aa3f0a736,DISK]]
+-------+---------+----------+------+--------+
|user_id|attr_name|attr_value|income|expenses|
+-------+---------+----------+------+--------+
+-------+---------+----------+------+--------+

6. 输出list类型,list中每个元素是Row类:

>>> df.collect()
[Row(user_id=1, attr_name=u'age', attr_value=u'30', income=50, expenses=40), Row(user_id=1, attr_name=u'city', attr_value=u'beijing', income=50, expenses=40), Row(user_id=1, attr_name=u'gender', attr_value=u'fale', income=50, expenses=40), Row(user_id=1, attr_name=u'height', attr_value=u'172cm', income=50, expenses=40), Row(user_id=1, attr_name=u'weight', attr_value=u'70kg', income=50, expenses=40), Row(user_id=2, attr_name=u'age', attr_value=u'26', income=100, expenses=80), Row(user_id=2, attr_name=u'city', attr_value=u'beijing', income=100, expenses=80), Row(user_id=2, attr_name=u'gender', attr_value=u'fale', income=100, expenses=80), Row(user_id=2, attr_name=u'height', attr_value=u'170cm', income=100, expenses=80), Row(user_id=2, attr_name=u'weight', attr_value=u'65kg', income=100, expenses=80), Row(user_id=3, attr_name=u'age', attr_value=u'35', income=99, expenses=99), Row(user_id=3, attr_name=u'city', attr_value=u'nanjing', income=99, expenses=99), Row(user_id=3, attr_name=u'gender', attr_value=u'female', income=99, expenses=99), Row(user_id=3, attr_name=u'height', attr_value=u'161cm', income=99, expenses=99), Row(user_id=3, attr_name=u'weight', attr_value=u'50kg', income=99, expenses=99)]

注:此方法将所有数据全部导入到本地,返回一个Array对象。当然,我们可以取出Array中的值,是一个Row,我们也可以取出Row中的值。

>>> list = df.collect()
>>> 19/02/22 16:54:04 WARN DFSClient: Slow ReadProcessor read fields took 43005ms (threshold=30000ms); ack: seqno: 179 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 18446744073455908425 flag: 0 flag: 0 flag: 0, targets: [DatanodeInfoWithStorage[172.21.3.38:50010,DS-82aedc87-a850-40aa-9d04-dc62ab0988ef,DISK], DatanodeInfoWithStorage[172.21.80.165:50010,DS-305daec5-3c77-48cd-bee2-4f839aea8bb4,DISK], DatanodeInfoWithStorage[172.21.151.40:50010,DS-29ba84d5-ad7d-407f-9484-d85aa3f0a736,DISK]]
 
>>> list[0]
Row(user_id=1, attr_name=u'age', attr_value=u'30', income=50, expenses=40)
>>> list[0][1]
u'age'

7. 查询概况

>>> df.describe().show()
19/02/22 16:58:23 WARN DFSClient: Slow ReadProcessor read fields took 78649ms (threshold=30000ms); ack: seqno: 188 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 187817284 flag: 0 flag: 0 flag: 0, targets: [DatanodeInfoWithStorage[172.21.3.38:50010,DS-82aedc87-a850-40aa-9d04-dc62ab0988ef,DISK], DatanodeInfoWithStorage[172.21.80.165:50010,DS-305daec5-3c77-48cd-bee2-4f839aea8bb4,DISK], DatanodeInfoWithStorage[172.21.151.40:50010,DS-29ba84d5-ad7d-407f-9484-d85aa3f0a736,DISK]]
+-------+------------------+---------+------------------+-----------------+------------------+
|summary|           user_id|attr_name|        attr_value|           income|          expenses|
+-------+------------------+---------+------------------+-----------------+------------------+
|  count|                15|       15|                15|               15|                15|
|   mean|               2.0|     null|30.333333333333332|             83.0|              73.0|
| stddev|0.8451542547285166|     null| 4.509249752822894|24.15722311383137|25.453037988757707|
|    min|                 1|      age|             161cm|               50|                40|
|    max|                 3|   weight|           nanjing|              100|                99|
+-------+------------------+---------+------------------+-----------------+------------------+

8. 去重set操作

>>> df.select('user_id').distinct().show()
+-------+                                                                       
|user_id|
+-------+
|      1|
|      3|
|      2|
+-------+

2.3.2 列元素操作

1. 选择一列或多列:select

df.select("age").show()
df["age"]
df.age
df.select(“name”)
df.select(df[‘name’], df[‘age’]+1)
df.select(df.a, df.b, df.c)    # 选择a、b、c三列
df.select(df["a"], df["b"], df["c"])    # 选择a、b、c三列

2. 用where按条件选择

>>> df.where("income = 50" ).show()                         
+-------+---------+----------+------+--------+
|user_id|attr_name|attr_value|income|expenses|
+-------+---------+----------+------+--------+
|      1|      age|        30|    50|      40|
|      1|     city|   beijing|    50|      40|
|      1|   gender|      fale|    50|      40|
|      1|   height|     172cm|    50|      40|
|      1|   weight|      70kg|    50|      40|
+-------+---------+----------+------+--------+

2.3.3 排序

1. orderBy:按指定字段排序,默认为升序

>>> df.orderBy(df.income.desc()).show()         
19/02/22 18:02:31 WARN DFSClient: Slow ReadProcessor read fields took 87360ms (threshold=30000ms); ack: seqno: 325 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 14139744 flag: 0 flag: 0 flag: 0, targets: [DatanodeInfoWithStorage[172.21.3.38:50010,DS-82aedc87-a850-40aa-9d04-dc62ab0988ef,DISK], DatanodeInfoWithStorage[172.21.80.165:50010,DS-305daec5-3c77-48cd-bee2-4f839aea8bb4,DISK], DatanodeInfoWithStorage[172.21.151.40:50010,DS-29ba84d5-ad7d-407f-9484-d85aa3f0a736,DISK]]
+-------+---------+----------+------+--------+
|user_id|attr_name|attr_value|income|expenses|
+-------+---------+----------+------+--------+
|      2|   gender|      fale|   100|      80|
|      2|   weight|      65kg|   100|      80|
|      2|   height|     170cm|   100|      80|
|      2|      age|        26|   100|      80|
|      2|     city|   beijing|   100|      80|
|      3|   gender|    female|    99|      99|
|      3|      age|        35|    99|      99|
|      3|   height|     161cm|    99|      99|
|      3|   weight|      50kg|    99|      99|
|      3|     city|   nanjing|    99|      99|
|      1|      age|        30|    50|      40|
|      1|   height|     172cm|    50|      40|
|      1|     city|   beijing|    50|      40|
|      1|   weight|      70kg|    50|      40|
|      1|   gender|      fale|    50|      40|
+-------+---------+----------+------+--------+

2.3.4 抽样

sample是抽样函数,其中withReplacement = True or False代表是否有放回。42是seed。

t1 = train.sample(False, 0.2, 42)

2.4 增加、删除、修改列

  1. 增加列用 withColumn 方法
    增加一列value全为0的列
from pyspark.sql.functions import lit
df.withColumn('newCol', lit(0)).show()
## 输出
+---+-----+-------+------------+------+
| Id| Name|Sallary|DepartmentId|newCol|
+---+-----+-------+------------+------+
|  1|  Joe|  70000|           1|     0|
|  2|Henry|  80000|        null|     0|
+---+-----+-------+------------+------+
  1. 重命名列名 pyspark系列–dataframe基础
# spark-1
# 在创建dataframe的时候重命名
data = spark.createDataFrame(data=[("Alberto", 2), ("Dakota", 2)],
                              schema=['name','length'])
data.show()
data.printSchema()

# spark-2
# 使用selectExpr方法
# 原始column as 修改之后的column
# cast 是修改整列的属性
color_df2 = color_df.selectExpr('cast(color as long) as color2','length as length2')
color_df2.show()

# spark-3
# withColumnRenamed方法
color_df2 = color_df.withColumnRenamed('color','color2')\
                    .withColumnRenamed('length','length2')
color_df2.show()

# spark-4
# alias 方法
color_df.select(color_df.color.alias('color2')).show()

2.5 groupBy 分组统计

In [63]: df.groupby('Sallary').count().show()
+-------------+-----+                                                           
|app_category2|count|
+-------------+-----+
|         null|  231|
|           77|  215|
|           81|  378|
|           84|   14|
+-------------+-----+

2.6 复杂用法实例

from pyspark.sql import functions as F

参考:

  1. Spark 2.2.x 中文文档
  2. Pyspark数据基础操作集合(DataFrame)
  3. PySpark-DataFrame各种常用操作举例
  4. (超详细)PySpark︱DataFrame操作指南:增/删/改/查/合并/统计与数据处理
  5. pyspark.sql module