RDD与DataFrame

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[2]").appName("TextFileApp").getOrCreate()
data = spark.createDataFrame(([1,12,100],[1,12,100],[1,12,100],[2,12,200],[3,13,100]),['ID','time','Consumer'])
data.show()
data.groupby('ID','time').sum('Consumer').show()

建立一个df

from pyspark import SparkContext

sc = SparkContext('local')
doc = sc.parallelize([['a', 'b', 'c'], ['b', 'd', 'd']])    # - parallelize()函数将一个List列表转化为了一个RDD对象
print('doc.collect():     ',doc.collect())                            # - collect()函数将这个RDD对象转化为了一个List列表
print('doc.glom().collect():     ',doc.glom().collect())                     # - glom()函数显示分区情况

words = doc.flatMap(lambda x: x).distinct().collect()       # flat map    铺平映射      # distinct() 去重
print('words:     ',words)

word_dict = {w: i for w, i in zip(words, range(len(words)))}
print('word_dict:    ',word_dict)

word_dict_b = sc.broadcast(word_dict)

def wordCountPerDoc(d):
    dict_new = {}
    wd = word_dict_b.value
    for w in d:
        if wd[w] in dict_new:
            dict_new[wd[w]] += 1
        else:
            dict_new[wd[w]] = 1
    return dict_new

print(doc.map(wordCountPerDoc).collect())           #map , 映射一个新的RDD

RDD的操作


一. spark中的RDD是什么,有哪些特性?
分布式数据集,它代表一个不可变,可分区,里面的元素可以并行计算的集合

二. RDD操作
.parallelize()函数将一个List列表转化为了一个RDD对象
.collect()函数将这个RDD对象转化为了一个List列表
.glom()函数显示分区情况

(1).map()
返回一个新分布式数据集,由每一个输入元素经过func函数转换后组成
(2).fiter()
返问一个新数据集,由经过func函数计算后返回值为true的输入元素组成
(3).flatMap()
类似于map,先映射后扁平化(flatten)
(4).mapPartitions()
mapPartitions将会被每一个数据集分区调用一次,各个数据集分区的全部内容将作为顺序的数据流传入函数func的参数中,被合并的结果自动转换成为新的RDD.

三. 宽窄依赖

宽依赖:指的是多个子RDD的Partition会依赖同一个父RDD的Partition,关系是一对多,父RDD的一个分区的数据去到子RDD的不同分区里面,会有shuffle的产生

窄依赖:指的是每一个父RDD的Partition最多被子RDD的一个partition使用,是一对一的,也就是父RDD的一个分区去到了子RDD的一个分区中,这个过程没有shuffle产生

区分的标准就是看父RDD的一个分区的数据的流向,要是流向一个partition的话就是窄依赖,否则就是宽依赖