RDD与DataFrame
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[2]").appName("TextFileApp").getOrCreate()
data = spark.createDataFrame(([1,12,100],[1,12,100],[1,12,100],[2,12,200],[3,13,100]),['ID','time','Consumer'])
data.show()
data.groupby('ID','time').sum('Consumer').show()建立一个df
from pyspark import SparkContext
sc = SparkContext('local')
doc = sc.parallelize([['a', 'b', 'c'], ['b', 'd', 'd']]) # - parallelize()函数将一个List列表转化为了一个RDD对象
print('doc.collect(): ',doc.collect()) # - collect()函数将这个RDD对象转化为了一个List列表
print('doc.glom().collect(): ',doc.glom().collect()) # - glom()函数显示分区情况
words = doc.flatMap(lambda x: x).distinct().collect() # flat map 铺平映射 # distinct() 去重
print('words: ',words)
word_dict = {w: i for w, i in zip(words, range(len(words)))}
print('word_dict: ',word_dict)
word_dict_b = sc.broadcast(word_dict)
def wordCountPerDoc(d):
dict_new = {}
wd = word_dict_b.value
for w in d:
if wd[w] in dict_new:
dict_new[wd[w]] += 1
else:
dict_new[wd[w]] = 1
return dict_new
print(doc.map(wordCountPerDoc).collect()) #map , 映射一个新的RDD
RDD的操作
一. spark中的RDD是什么,有哪些特性?
分布式数据集,它代表一个不可变,可分区,里面的元素可以并行计算的集合
二. RDD操作
.parallelize()函数将一个List列表转化为了一个RDD对象
.collect()函数将这个RDD对象转化为了一个List列表
.glom()函数显示分区情况
(1).map()
返回一个新分布式数据集,由每一个输入元素经过func函数转换后组成
(2).fiter()
返问一个新数据集,由经过func函数计算后返回值为true的输入元素组成
(3).flatMap()
类似于map,先映射后扁平化(flatten)
(4).mapPartitions()
mapPartitions将会被每一个数据集分区调用一次,各个数据集分区的全部内容将作为顺序的数据流传入函数func的参数中,被合并的结果自动转换成为新的RDD.
三. 宽窄依赖
宽依赖:指的是多个子RDD的Partition会依赖同一个父RDD的Partition,关系是一对多,父RDD的一个分区的数据去到子RDD的不同分区里面,会有shuffle的产生
窄依赖:指的是每一个父RDD的Partition最多被子RDD的一个partition使用,是一对一的,也就是父RDD的一个分区去到了子RDD的一个分区中,这个过程没有shuffle产生
区分的标准就是看父RDD的一个分区的数据的流向,要是流向一个partition的话就是窄依赖,否则就是宽依赖

京公网安备 11010502036488号