Spark Streaming 概述
Spark Streaming 是Spark Core的扩展,能够水平扩展,高吞吐量,有容错机制的进行对实时数据进行流处理。
能够对接Kafka(常用),Flume等中的数据
处理的数据能够写到文件系统,数据库或者dashboard上
因为Spark Streaming 是基于Spark Core 的,所以在Spark安装完成之后可以直接使用。
Spark Streaming 的工作流程:Spark Streaming 接收流数据流,并把数据流切分成批次(batches)
Spark Streaming 的核心抽象叫做DStream,代表一段连续的数据流
DStream 能够从类似Kafka等数据源中创建,也可以从其他DStream中创建
内部上DStream是RDD序列。
Spark Core 的程序入口是SparkContext
Spark SQL的程序入口是SparkSession
Spark Streaming 的程序入口是StreamingContext
创建一个有两个线程的local 的SparkContext,命名为NetWordCount
然后创建batch间隔为1s 的 StreamingContext
用这个StreamingContext来创建一个DStream监听本地的9999端口
在这个DStream中每一个record代表的是一行text,对text进行wordCount操作
Spark Streaming 要有执行代码start() 和等待计算结束的代码awaitTermination()
完整代码如下:
Spark Streaming的执行需要使用NetCat进行输入和监听
执行效果如下图所示
Spark Streaming 核心概念
Spark Streaming编程的时候,不要把master参数硬编码到代码中,而是要使用参数进行传入
batch间隔需要根据业务延迟的需求进行设置,batch interval参数就是Spark Streaming拆分数据流的依据
建立好Spark Streaming之后,要做:
- 创建DStream表示输入流
- 使用transformation 和 output opeatations(类似RDD的action)处理DStream,代表流计算
- 调用start方法开始计算
- 调用awaitTermination方法等待计算完毕
- 调用stop方法结束(实际上可能用不到,如果是24小时开启)
注意:
- 一旦一个context已经被启动了,就不能再向context中添加新的计算了
- 一旦context被停止,就不能被重启
- 一个JVM中只能启动一个Spark Streaming
- stop操作停止StreamingContext时也会使得SparkContext停止
- 一个SparkContext可以被多个StreamingContext复用
SparkStreaming中socketTextStream接收三个参数,host,port,storageLevel,其中storageLevel默认是MEMORY_AND_DISK_2,这个与Spark Core中的MEMORY_ONLY是不同的
DStream是SparkStreaming最基本的抽象,代表一段连续的数据流。
从数据源创建或者是通过DStream的transformation转换过来
DStream在本质上代表连续的RDD序列
对DStream进行的转换操作,实际上都是对DStream中的RDDs进行的操作
DStream中的转换操作
DStream的ouput operations
Spark Streaming 的textFileStream可以创建一个 能够监控hadoop兼容的文件系统 的input stream,有新的文件进来就会以textFile的格式读取。需要传入directory目录参数
开启Streaming进程后,创建一个文件并移动到directory目录下,使用mv的方式
如果是hdfs上的文件只需要改变一下参数即可