转化操作:DStream的转化操作可以分为无状态(stateless)和有状态(stateful)两种。
·在无状态转化操作中,每个批次的处理不依赖于之前批次的数据。之前的RDD转化操作,如map()、filter()、reduceByKey()等,都是无状态转化操作。
·有状态转化操作需要使用之前批次的数据或者是中间结果来计算变量当前批次的数据。有状态转化操作包括基于滑动窗口的转化操作和追踪状态变化的转化操作。
无状态转化操作
无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。
表:部分DStream无状态转化操作的例子
函数名称 | 目的 | Scala实例 | 用来操作DStream[T]的用 户自定义函数的函数签名 |
map() | 对DStream中的每个元素应用给定函数,返回由各元素输出 的元素组成的DStream。 | ds.map(x => x + 1) | f: (T) -> U |
flatMap() | 对Stream中的每个元素应用给定函数,返回各个元素输出的 迭代器组成的DStream。 | ds.flatMap(x => x.split(" ")) | f: T -> Iterable[U] |
filter() | 返回由给定DStream中通过筛选的元素组成的DStream。 | ds.filter(x => x != 1) | f: T -> Boolean |
repartition() | 改变DStream的分区数。 | ds.repartition(10) | N/A |
reduceByKey() | 将每个批次中键相同的记录归约。 | ds.reduceByKey ((x, y) => x + y)) | f: T, T -> T |
groupByKey() | 将每个批次中的记录根据键分组。 | ds.groupByKey() | N/A |
无状态转化操作也能在多个DStream间整合数据,不过也是在各个时间区间内。
有状态转化操作
DStream的有状态转化操作时跨时间跟踪数据的操作;也就是说,一些先前批次的数据也被用来在新的批次中计算结果。主要的两种滑动窗口类型是滑动窗口和updateStateBykey(),前者以一个时间阶段为滑动窗口进行操作,后者则用来跟踪每个键的变化(例如构建一个代表用户会话的对象)。 基于窗口的转化操作
所有基于窗口的操作都需要两个参数,为窗口时长以及滑动步长,两者都必须是StreamContext的批次间隔的整数倍。窗口时长控制每次计算最近的多少个批次的数据,其实就是最近的WindowDuration/batchInterval个批次。
例:如何在Scala中使用Window()对窗口进行计数。
val accessLogsWindow = accessLogsDStream.Window(Second(30), Second(10)) val windowCounts = accessLogsWindow.count()例例:如何在Java中使用Window()对窗口进行计数。
JavaDStream<ApacheAccessLog> accessLogsWindow = accessLogsDStream.window( Durations.seconds(30), Durations.seconds(10)); JavaDStream<Integer> windowCounts = accessLogsWindow.count();Spark Streaming还提供了一些其他的窗口操作,让用户可以高效而方便地进行归约操作。reduceByWindow()和reduceByKeyAndWindow()它们接受一个归约函数,在整个窗口上执行,比如+。除此之外,它们还有一种特殊形式,通过只考虑进入新窗口的数据和离开窗口的数据,让Spark增量计算归约结果。这种特殊形式需要提供归约函数的一个逆函数,比如+对应的一函数为 -。对于较大的窗口,提供逆函数可以大大提高效率。
UpdateStateByKey转化操作
需要在DStream中跨批次维护状态,此时updateStateByKey()为我们提供了对一个变量的访问,用于键值对形式的DStream。给定一个由(键,事件)对构成的DStream,并传递一个如何根据新的事件更新每个对应状态的函数,它可以构建出一个新的DStream,其内部数据为(键,状态)对。
要使用updateStateByKey(),提供了一个update(events,oldState)函数,接受与某键相关的事件以及该键之前的状态,返回这个键对应的新状态。这个函数的签名如下所示:
· events: 是在当前批次中收到的事件的列表(可能为空)。
· oldState: 是一个可选的状态对象,存放在Option内;如果一个键没有之前的状态,这个值可以空缺。
· newState:由函数返回,也以Option形式存在;我们可以返回一个空的Option来表示想要删除该状态。
updateStateByKey()的结果会是一个新的DStream,其内部的RDD序列是由每个时间区间对应的(键,状态)对组成的。
例:在Java中使用updateStateByKey()运行响应代码的计数
class UpdateRunningSum implements Function2<List<Long>>, Optional<Long>, Optional<Long>{ public Optional<Long> call(List<Long>, nums, Optional<Long>, current){ long sun = current.or(0L); return Optional.of(sum + nums.size()); } } JavaPairDStream<Integer, Long> responseCodeCountDStream = accessLogsDStream.mapToPair( new PairFunction<ApacheAccessLog, Integer, Long>(){ public Tuple2<Integer, Long> call(ApacheAccessLog log){ return new Tuple2(log.getResponseCode(), 1L); } }) .updateStateByKey(new UpdateRunningSum());