一、安装
基于 mac 操作系统
- 安装 jdk
jdk 下载地址
- 安装 pyspark
pip install pyspark
二、读取 HDFS 文件
- 读 json
注意,如果是多行的 json,需要用“multiLine”
模式,否则会报错
data_path = "./test_file.json" # 本地
# data_path = "hdfs://..."
df = spark.read.json(data_path)
df = spark.read.option("multiLine", True).option("mode", "PERMISSIVE").json(data_path)
- 读 parquet
data_path = "hdfs://..."
df = spark.read.parquet(data_path)
三、基本操作
2.1 建立SparkSession对象
一切操作之前需要先建立一个SparkSession对象(运行Spark code的Entrance point,可以理解为交互部件):
详见: pyspark.sql module
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Word Count").config("spark.some.config.option", "some-value").getOrCreate()
# spark = SparkSession.builder.appName('mu').master('local').getOrCreate()
- 如果遇到如下报错
Traceback (most recent call last):
File "/Users/my_name/caogao/code_test_1/code_test_pyspark.py", line 5, in <module>
spark = SparkSession.builder.master("local").getOrCreate()
File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/pyspark/sql/session.py", line 186, in getOrCreate
sc = SparkContext.getOrCreate(sparkConf)
File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/pyspark/context.py", line 376, in getOrCreate
SparkContext(conf=conf or SparkConf())
File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/pyspark/context.py", line 136, in __init__
conf, jsc, profiler_cls)
File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/pyspark/context.py", line 198, in _do_init
self._jsc = jsc or self._initialize_context(self._conf._jconf)
File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/pyspark/context.py", line 315, in _initialize_context
return self._jvm.JavaSparkContext(jconf)
File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/py4j/java_gateway.py", line 1569, in __call__
answer, self._gateway_client, None, self._fqn)
File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.net.BindException: Can't assign requested address: Service 'sparkDriver' failed after 16 retries (on a random free port)! Consider explicitly setting the appropriate binding address for the service 'sparkDriver' (for example spark.driver.bindAddress for SparkDriver) to the correct binding address.
则在开头添加代码
import pyspark
conf = pyspark.SparkConf().set('spark.driver.host','127.0.0.1')
sc = pyspark.SparkContext(master='local', appName='myAppName',conf=conf)
参考:解决方案
2.2 创建模拟数据表
test = []
test.append((1, 'age', '30', 50, 40))
test.append((1, 'city', 'beijing', 50, 40))
test.append((1, 'gender', 'fale', 50, 40))
test.append((1, 'height', '172cm', 50, 40))
test.append((1, 'weight', '70kg', 50, 40))
test.append((2, 'age', '26', 100, 80))
test.append((2, 'city', 'beijing', 100, 80))
test.append((2, 'gender', 'fale', 100, 80))
test.append((2, 'height', '170cm', 100, 80))
test.append((2, 'weight', '65kg', 100, 80))
test.append((3, 'age', '35', 99, 99))
test.append((3, 'city', 'nanjing', 99, 99))
test.append((3, 'gender', 'female', 99, 99))
test.append((3, 'height', '161cm', 99, 99))
test.append((3, 'weight', '50kg', 99, 99))
df = spark.createDataFrame(test,
['user_id', 'attr_name','attr_value', 'income', 'expenses'])
或者直接
df = spark.createDataFrame([('1', 'Joe', '70000', '1'), ('2', 'Henry', '80000', None)],
['Id', 'Name', 'Sallary', 'DepartmentId'])
2.3 查
2.3.1 行元素查询操作
1. 打印数据
df.show()默认打印前20条数据,当然可以指定具体打印多少条数据。
如果有些属性值特别长,pyspark会截断数据导致打不全,这时候可以使用. df.show(truncate=False)
>>> df.show()
+-------+---------+----------+------+--------+
|user_id|attr_name|attr_value|income|expenses|
+-------+---------+----------+------+--------+
| 1| age| 30| 50| 40|
| 1| city| beijing| 50| 40|
| 1| gender| fale| 50| 40|
| 1| height| 172cm| 50| 40|
| 1| weight| 70kg| 50| 40|
| 2| age| 26| 100| 80|
| 2| city| beijing| 100| 80|
| 2| gender| fale| 100| 80|
| 2| height| 170cm| 100| 80|
| 2| weight| 65kg| 100| 80|
| 3| age| 35| 99| 99|
| 3| city| nanjing| 99| 99|
| 3| gender| female| 99| 99|
| 3| height| 161cm| 99| 99|
| 3| weight| 50kg| 99| 99|
+-------+---------+----------+------+--------+
>>> df.show(3)
+-------+---------+----------+------+--------+
|user_id|attr_name|attr_value|income|expenses|
+-------+---------+----------+------+--------+
| 1| age| 30| 50| 40|
| 1| city| beijing| 50| 40|
| 1| gender| fale| 50| 40|
+-------+---------+----------+------+--------+
only showing top 3 rows
2. 打印概要
>>> df.printSchema()
root
|-- user_id: long (nullable = true)
|-- attr_name: string (nullable = true)
|-- attr_value: string (nullable = true)
|-- income: long (nullable = true)
|-- expenses: long (nullable = true)
3. 查询总行数
>>> df.count()
15
4. 获取头几行到本地
>>> list = df.head(3)
>>> df.head(3)
[Row(user_id=1, attr_name=u'age', attr_value=u'30', income=50, expenses=40), Row(user_id=1, attr_name=u'city', attr_value=u'beijing', income=50, expenses=40), Row(user_id=1, attr_name=u'gender', attr_value=u'fale', income=50, expenses=40)]
>>> df.take(5)
[Row(user_id=1, attr_name=u'age', attr_value=u'30', income=50, expenses=40), Row(user_id=1, attr_name=u'city', attr_value=u'beijing', income=50, expenses=40), Row(user_id=1, attr_name=u'gender', attr_value=u'fale', income=50, expenses=40), Row(user_id=1, attr_name=u'height', attr_value=u'172cm', income=50, expenses=40), Row(user_id=1, attr_name=u'weight', attr_value=u'70kg', income=50, expenses=40)]
5. 查询某列为null的行
>>> from pyspark.sql.functions import isnull
>>> df = df.filter(isnull("income"))
>>> df.show()
19/02/22 17:05:51 WARN DFSClient: Slow ReadProcessor read fields took 87487ms (threshold=30000ms); ack: seqno: 198 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 17565965 flag: 0 flag: 0 flag: 0, targets: [DatanodeInfoWithStorage[172.21.3.38:50010,DS-82aedc87-a850-40aa-9d04-dc62ab0988ef,DISK], DatanodeInfoWithStorage[172.21.80.165:50010,DS-305daec5-3c77-48cd-bee2-4f839aea8bb4,DISK], DatanodeInfoWithStorage[172.21.151.40:50010,DS-29ba84d5-ad7d-407f-9484-d85aa3f0a736,DISK]]
+-------+---------+----------+------+--------+
|user_id|attr_name|attr_value|income|expenses|
+-------+---------+----------+------+--------+
+-------+---------+----------+------+--------+
6. 输出list类型,list中每个元素是Row类:
>>> df.collect()
[Row(user_id=1, attr_name=u'age', attr_value=u'30', income=50, expenses=40), Row(user_id=1, attr_name=u'city', attr_value=u'beijing', income=50, expenses=40), Row(user_id=1, attr_name=u'gender', attr_value=u'fale', income=50, expenses=40), Row(user_id=1, attr_name=u'height', attr_value=u'172cm', income=50, expenses=40), Row(user_id=1, attr_name=u'weight', attr_value=u'70kg', income=50, expenses=40), Row(user_id=2, attr_name=u'age', attr_value=u'26', income=100, expenses=80), Row(user_id=2, attr_name=u'city', attr_value=u'beijing', income=100, expenses=80), Row(user_id=2, attr_name=u'gender', attr_value=u'fale', income=100, expenses=80), Row(user_id=2, attr_name=u'height', attr_value=u'170cm', income=100, expenses=80), Row(user_id=2, attr_name=u'weight', attr_value=u'65kg', income=100, expenses=80), Row(user_id=3, attr_name=u'age', attr_value=u'35', income=99, expenses=99), Row(user_id=3, attr_name=u'city', attr_value=u'nanjing', income=99, expenses=99), Row(user_id=3, attr_name=u'gender', attr_value=u'female', income=99, expenses=99), Row(user_id=3, attr_name=u'height', attr_value=u'161cm', income=99, expenses=99), Row(user_id=3, attr_name=u'weight', attr_value=u'50kg', income=99, expenses=99)]
注:此方法将所有数据全部导入到本地,返回一个Array对象。当然,我们可以取出Array中的值,是一个Row,我们也可以取出Row中的值。
>>> list = df.collect()
>>> 19/02/22 16:54:04 WARN DFSClient: Slow ReadProcessor read fields took 43005ms (threshold=30000ms); ack: seqno: 179 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 18446744073455908425 flag: 0 flag: 0 flag: 0, targets: [DatanodeInfoWithStorage[172.21.3.38:50010,DS-82aedc87-a850-40aa-9d04-dc62ab0988ef,DISK], DatanodeInfoWithStorage[172.21.80.165:50010,DS-305daec5-3c77-48cd-bee2-4f839aea8bb4,DISK], DatanodeInfoWithStorage[172.21.151.40:50010,DS-29ba84d5-ad7d-407f-9484-d85aa3f0a736,DISK]]
>>> list[0]
Row(user_id=1, attr_name=u'age', attr_value=u'30', income=50, expenses=40)
>>> list[0][1]
u'age'
7. 查询概况
>>> df.describe().show()
19/02/22 16:58:23 WARN DFSClient: Slow ReadProcessor read fields took 78649ms (threshold=30000ms); ack: seqno: 188 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 187817284 flag: 0 flag: 0 flag: 0, targets: [DatanodeInfoWithStorage[172.21.3.38:50010,DS-82aedc87-a850-40aa-9d04-dc62ab0988ef,DISK], DatanodeInfoWithStorage[172.21.80.165:50010,DS-305daec5-3c77-48cd-bee2-4f839aea8bb4,DISK], DatanodeInfoWithStorage[172.21.151.40:50010,DS-29ba84d5-ad7d-407f-9484-d85aa3f0a736,DISK]]
+-------+------------------+---------+------------------+-----------------+------------------+
|summary| user_id|attr_name| attr_value| income| expenses|
+-------+------------------+---------+------------------+-----------------+------------------+
| count| 15| 15| 15| 15| 15|
| mean| 2.0| null|30.333333333333332| 83.0| 73.0|
| stddev|0.8451542547285166| null| 4.509249752822894|24.15722311383137|25.453037988757707|
| min| 1| age| 161cm| 50| 40|
| max| 3| weight| nanjing| 100| 99|
+-------+------------------+---------+------------------+-----------------+------------------+
8. 去重set操作
>>> df.select('user_id').distinct().show()
+-------+
|user_id|
+-------+
| 1|
| 3|
| 2|
+-------+
2.3.2 列元素操作
1. 选择一列或多列:select
df.select("age").show()
df["age"]
df.age
df.select(“name”)
df.select(df[‘name’], df[‘age’]+1)
df.select(df.a, df.b, df.c) # 选择a、b、c三列
df.select(df["a"], df["b"], df["c"]) # 选择a、b、c三列
2. 用where按条件选择
>>> df.where("income = 50" ).show()
+-------+---------+----------+------+--------+
|user_id|attr_name|attr_value|income|expenses|
+-------+---------+----------+------+--------+
| 1| age| 30| 50| 40|
| 1| city| beijing| 50| 40|
| 1| gender| fale| 50| 40|
| 1| height| 172cm| 50| 40|
| 1| weight| 70kg| 50| 40|
+-------+---------+----------+------+--------+
2.3.3 排序
1. orderBy:按指定字段排序,默认为升序
>>> df.orderBy(df.income.desc()).show()
19/02/22 18:02:31 WARN DFSClient: Slow ReadProcessor read fields took 87360ms (threshold=30000ms); ack: seqno: 325 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 14139744 flag: 0 flag: 0 flag: 0, targets: [DatanodeInfoWithStorage[172.21.3.38:50010,DS-82aedc87-a850-40aa-9d04-dc62ab0988ef,DISK], DatanodeInfoWithStorage[172.21.80.165:50010,DS-305daec5-3c77-48cd-bee2-4f839aea8bb4,DISK], DatanodeInfoWithStorage[172.21.151.40:50010,DS-29ba84d5-ad7d-407f-9484-d85aa3f0a736,DISK]]
+-------+---------+----------+------+--------+
|user_id|attr_name|attr_value|income|expenses|
+-------+---------+----------+------+--------+
| 2| gender| fale| 100| 80|
| 2| weight| 65kg| 100| 80|
| 2| height| 170cm| 100| 80|
| 2| age| 26| 100| 80|
| 2| city| beijing| 100| 80|
| 3| gender| female| 99| 99|
| 3| age| 35| 99| 99|
| 3| height| 161cm| 99| 99|
| 3| weight| 50kg| 99| 99|
| 3| city| nanjing| 99| 99|
| 1| age| 30| 50| 40|
| 1| height| 172cm| 50| 40|
| 1| city| beijing| 50| 40|
| 1| weight| 70kg| 50| 40|
| 1| gender| fale| 50| 40|
+-------+---------+----------+------+--------+
2.3.4 抽样
sample是抽样函数,其中withReplacement = True or False代表是否有放回。42是seed。
t1 = train.sample(False, 0.2, 42)
2.4 增加、删除、修改列
- 增加列用
withColumn
方法
增加一列value全为0的列
from pyspark.sql.functions import lit
df.withColumn('newCol', lit(0)).show()
## 输出
+---+-----+-------+------------+------+
| Id| Name|Sallary|DepartmentId|newCol|
+---+-----+-------+------------+------+
| 1| Joe| 70000| 1| 0|
| 2|Henry| 80000| null| 0|
+---+-----+-------+------------+------+
- 重命名列名 pyspark系列–dataframe基础
# spark-1
# 在创建dataframe的时候重命名
data = spark.createDataFrame(data=[("Alberto", 2), ("Dakota", 2)],
schema=['name','length'])
data.show()
data.printSchema()
# spark-2
# 使用selectExpr方法
# 原始column as 修改之后的column
# cast 是修改整列的属性
color_df2 = color_df.selectExpr('cast(color as long) as color2','length as length2')
color_df2.show()
# spark-3
# withColumnRenamed方法
color_df2 = color_df.withColumnRenamed('color','color2')\
.withColumnRenamed('length','length2')
color_df2.show()
# spark-4
# alias 方法
color_df.select(color_df.color.alias('color2')).show()
2.5 groupBy 分组统计
In [63]: df.groupby('Sallary').count().show()
+-------------+-----+
|app_category2|count|
+-------------+-----+
| null| 231|
| 77| 215|
| 81| 378|
| 84| 14|
+-------------+-----+
2.6 复杂用法实例
from pyspark.sql import functions as F