RDD与DataFrame
from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[2]").appName("TextFileApp").getOrCreate() data = spark.createDataFrame(([1,12,100],[1,12,100],[1,12,100],[2,12,200],[3,13,100]),['ID','time','Consumer']) data.show() data.groupby('ID','time').sum('Consumer').show()
建立一个df
from pyspark import SparkContext sc = SparkContext('local') doc = sc.parallelize([['a', 'b', 'c'], ['b', 'd', 'd']]) # - parallelize()函数将一个List列表转化为了一个RDD对象 print('doc.collect(): ',doc.collect()) # - collect()函数将这个RDD对象转化为了一个List列表 print('doc.glom().collect(): ',doc.glom().collect()) # - glom()函数显示分区情况 words = doc.flatMap(lambda x: x).distinct().collect() # flat map 铺平映射 # distinct() 去重 print('words: ',words) word_dict = {w: i for w, i in zip(words, range(len(words)))} print('word_dict: ',word_dict) word_dict_b = sc.broadcast(word_dict) def wordCountPerDoc(d): dict_new = {} wd = word_dict_b.value for w in d: if wd[w] in dict_new: dict_new[wd[w]] += 1 else: dict_new[wd[w]] = 1 return dict_new print(doc.map(wordCountPerDoc).collect()) #map , 映射一个新的RDD
RDD的操作
一. spark中的RDD是什么,有哪些特性?
分布式数据集,它代表一个不可变,可分区,里面的元素可以并行计算的集合
二. RDD操作
.parallelize()函数将一个List列表转化为了一个RDD对象
.collect()函数将这个RDD对象转化为了一个List列表
.glom()函数显示分区情况
(1).map()
返回一个新分布式数据集,由每一个输入元素经过func函数转换后组成
(2).fiter()
返问一个新数据集,由经过func函数计算后返回值为true的输入元素组成
(3).flatMap()
类似于map,先映射后扁平化(flatten)
(4).mapPartitions()
mapPartitions将会被每一个数据集分区调用一次,各个数据集分区的全部内容将作为顺序的数据流传入函数func的参数中,被合并的结果自动转换成为新的RDD.
三. 宽窄依赖
宽依赖:指的是多个子RDD的Partition会依赖同一个父RDD的Partition,关系是一对多,父RDD的一个分区的数据去到子RDD的不同分区里面,会有shuffle的产生
窄依赖:指的是每一个父RDD的Partition最多被子RDD的一个partition使用,是一对一的,也就是父RDD的一个分区去到了子RDD的一个分区中,这个过程没有shuffle产生
区分的标准就是看父RDD的一个分区的数据的流向,要是流向一个partition的话就是窄依赖,否则就是宽依赖