Flink的窗口操作有哪些(flink,开发技术)

时间:2024-05-08 19:51:40 作者 : 石家庄SEO 分类 : 开发技术
  • TAG :

我们经常需要在一个时间窗口维度上对数据进行聚合,窗口是流处理应用中经常需要解决的问题。Flink的窗口算子为我们提供了方便易用的API,我们可以将数据流切分成一个个窗口,对窗口内的数据进行处理。

Flink的窗口操作有哪些

一、窗口(window)的类型

对于窗口的操作主要分为两种,分别对于Keyedstream和Datastream。他们的主要区别也仅仅在于建立窗口的时候一个为.window(…),一个为.windowAll(…)。对于Keyedstream的窗口来说,他可以使得多任务并行计算,每一个logical key stream将会被独立的进行处理。

stream.keyBy(...)"assigner"[.trigger(...)]"trigger"(elsedefaulttrigger)[.evictor(...)]"evictor"(elsenoevictor)[.allowedLateness(...)]"lateness"(elsezero)[.sideOutputLateData(...)]"outputtag"(elsenosideoutputforlatedata).reduce/aggregate/fold/apply()"function"[.getSideOutput(...)]"outputtag"

按照窗口的Assigner来分,窗口可以分为

Tumbling window, sliding window,session window,global window,custom window

每种窗口又可分别基于processing time和event time,这样的话,窗口的类型严格来说就有很多。

还有一种window叫做count window,依据元素到达的数量进行分配,之后也会提到。

窗口的生命周期开始在第一个属于这个窗口的元素到达的时候,结束于第一个不属于这个窗口的元素到达的时候。

二、窗口的操作

2.1 Tumbling window

固定相同间隔分配窗口,每个窗口之间没有重叠看图一眼明白。

Flink的窗口操作有哪些

下面的例子定义了每隔3毫秒一个窗口的流:

WindowedStreamRates=rates.keyBy(MovieRate::getUserId).window(TumblingEventTimeWindows.of(Time.milliseconds(3)));

2.2 Sliding Windows

跟上面一样,固定相同间隔分配窗口,只不过每个窗口之间有重叠。窗口重叠的部分如果比窗口小,窗口将会有多个重叠,即一个元素可能被分配到多个窗口里去。

Flink的窗口操作有哪些

下面的例子给出窗口大小为10毫秒,重叠为5毫秒的流:

WindowedStreamRates=rates.keyBy(MovieRate::getUserId).window(SlidingEventTimeWindows.of(Time.milliseconds(10),Time.milliseconds(5)));

2.3 Session window

这种窗口主要是根据活动的事件进行窗口化,他们通常不重叠,也没有一个固定的开始和结束时间。一个session window关闭通常是由于一段时间没有收到元素。在这种用户交互事件流中,我们首先想到的是将事件聚合到会话窗口中(一段用户持续活跃的周期),由非活跃的间隙分隔开。

Flink的窗口操作有哪些
//静态间隔时间WindowedStreamRates=rates.keyBy(MovieRate::getUserId).window(EventTimeSessionWindows.withGap(Time.milliseconds(10)));//动态时间WindowedStreamRates=rates.keyBy(MovieRate::getUserId).window(EventTimeSessionWindows.withDynamicGap(()));

2.4 Global window

将所有相同keyed的元素分配到一个窗口里。好吧,就这样:

WindowedStreamRates=rates.keyBy(MovieRate::getUserId).window(GlobalWindows.create());

三、窗口函数

窗口函数就是这四个:ReduceFunction,AggregateFunction,FoldFunction,ProcessWindowFunction。前两个执行得更有效,因为Flink可以增量地聚合每个到达窗口的元素。

Flink必须在调用函数之前在内部缓冲窗口中的所有元素,所以使用ProcessWindowFunction进行操作效率不高。不过ProcessWindowFunction可以跟其他的窗口函数结合使用,其他函数接受增量信息,ProcessWindowFunction接受窗口的元数据。

举一个AggregateFunction的例子吧,下面代码为MovieRate按user分组,且分配5毫秒的Tumbling窗口,返回每个user在窗口内评分的所有分数的平均值。

DataStream>Rates=rates.keyBy(MovieRate::getUserId).window(TumblingEventTimeWindows.of(Time.milliseconds(5))).aggregate(newAggregateFunction>(){@OverridepublicAverageAccumulatorcreateAccumulator(){returnnewAverageAccumulator();}@OverridepublicAverageAccumulatoradd(MovieRatemovieRate,AverageAccumulatoracc){acc.userId=movieRate.userId;acc.sum+=movieRate.rate;acc.count++;returnacc;}@OverridepublicTuple2getResult(AverageAccumulatoracc){returnTuple2.of(acc.userId,acc.sum/(double)acc.count);}@OverridepublicAverageAccumulatormerge(AverageAccumulatoracc0,AverageAccumulatoracc1){acc0.count+=acc1.count;acc0.sum+=acc1.sum;returnacc0;}});publicstaticclassAverageAccumulator{intuserId;intcount;doublesum;}

以下是部分输出:

...1>(44,3.0)4>(96,0.5)2>(51,0.5)3>(90,2.75)...

看上面的代码,会发现add()函数特别生硬,因为我们想返回Tuple2类型,即Integer为key,但AggregateFunction似乎没有提供这个机制可以让AverageAccumulator的构造函数提供参数。所以,这里引入ProcessWindowFunction与AggregateFunction的结合版,AggregateFunction进行增量叠加,当窗口关闭时,ProcessWindowFunction将会被提供AggregateFunction返回的结果,进行Tuple封装:

DataStream>Rates=rates.keyBy(MovieRate::getUserId).window(TumblingEventTimeWindows.of(Time.milliseconds(5))).aggregate(newMyAggregateFunction(),newMyProcessWindowFunction());publicstaticclassMyAggregateFunctionimplementsAggregateFunction{@OverridepublicAverageAccumulatorcreateAccumulator(){returnnewAverageAccumulator();}@OverridepublicAverageAccumulatoradd(MovieRatemovieRate,AverageAccumulatoracc){acc.sum+=movieRate.rate;acc.count++;returnacc;}@OverridepublicDoublegetResult(AverageAccumulatoracc){returnacc.sum/(double)acc.count;}@OverridepublicAverageAccumulatormerge(AverageAccumulatoracc0,AverageAccumulatoracc1){acc0.count+=acc1.count;acc0.sum+=acc1.sum;returnacc0;}}publicstaticclassMyProcessWindowFunctionextendsProcessWindowFunction,Integer,TimeWindow>{@Overridepublicvoidprocess(Integerkey,Contextcontext,Iterableresults,Collector>out)throwsException{Doubleresult=results.iterator().next();out.collect(newTuple2(key,result));}}publicstaticclassAverageAccumulator{intcount;doublesum;}

可以得到,结果与上面一样,但代码好看了很多。

四、其他操作

4.1 Triggers(触发器)

触发器定义了窗口何时准备好被窗口处理。每个窗口分配器默认都有一个触发器,如果默认的触发器不符合你的要求,就可以使用trigger(…)自定义触发器。

通常来说,默认的触发器适用于多种场景。例如,多有的event-time窗口分配器都有一个EventTimeTrigger作为默认触发器。该触发器在watermark通过窗口末尾时出发。

PS:GlobalWindow默认的触发器时NeverTrigger,该触发器从不出发,所以在使用GlobalWindow时必须自定义触发器。

4.2 Evictors(驱逐器)

Evictors可以在触发器触发之后以及窗口函数被应用之前和/或之后可选择的移除元素。使用Evictor可以防止预聚合,因为窗口的所有元素都必须在应用计算逻辑之前先传给Evictor进行处理

4.3 Allowed Lateness

当使用event-time窗口时,元素可能会晚到,例如Flink用于跟踪event-time进度的watermark已经超过了窗口的结束时间戳。

默认来说,当watermark超过窗口的末尾时,晚到的元素会被丢弃。但是flink也允许为窗口operator指定最大的allowed lateness,以至于可以容忍在彻底删除元素之前依然接收晚到的元素,其默认值是0。

为了支持该功能,Flink会保持窗口的状态,知道allowed lateness到期。一旦到期,flink会删除窗口并删除其状态。

把晚到的元素当作side output。

SingleOutputStreamOperatorresult=input.keyBy().window().allowedLateness().sideOutputLateData(lateOutputTag).(function>);
 </div> <div class="zixun-tj-product adv-bottom"></div> </div> </div> <div class="prve-next-news">
本文:Flink的窗口操作有哪些的详细内容,希望对您有所帮助,信息来源于网络。
上一篇:Linux系统中chkconfig命令怎么用下一篇:

3 人围观 / 0 条评论 ↓快速评论↓

(必须)

(必须,保密)

阿狸1 阿狸2 阿狸3 阿狸4 阿狸5 阿狸6 阿狸7 阿狸8 阿狸9 阿狸10 阿狸11 阿狸12 阿狸13 阿狸14 阿狸15 阿狸16 阿狸17 阿狸18