网站建设代码大全,公众号文章模板素材,双控机制建设网站,汽车网站营销窗口理解
窗口#xff08;Window#xff09;是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中#xff0c;再对每个“桶”加以处理。 本文的重心将放在 Flink 如何进行窗口操作以及开发者如何尽可能地利用 Flink 所提供的功能。
对窗口的正确理解#xff…窗口理解
窗口Window是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中再对每个“桶”加以处理。 本文的重心将放在 Flink 如何进行窗口操作以及开发者如何尽可能地利用 Flink 所提供的功能。
对窗口的正确理解 我们将窗口理解为一个一个的水桶数据流stream就像水流每个数据都会分发到对应的桶中当达到结束时间时对每个桶中收集的数据进行计算处理 注 Flink中窗口并不是静态准备好的而是动态创建——当有落在这个窗口区间范围的数据达到时才创建对应的窗口
窗口的分类
按照驱动类型分
时间窗口Time Window
以时间来定义窗口的开始和结束获取某一段时间内的数据类比于我们的定时发车
计数窗口Count Window
计数窗口是基于元素的个数来获取窗口达到固定个数时就计算并关闭窗口。类比于我们的人齐才发车
按照窗口分配数据的规则分类
滚动窗口Tumbling Window
窗口之间没有重叠也不会有间隔的首尾相撞状态这样每个数据都会被分到一个窗口而且只会属于一个窗口。 滚动窗口的应用非常广泛它可以对每个时间段做聚合统计很多BI分析指标都可以用它来实现。
DataStreamT input ...;// 滚动 event-time 窗口
input.keyBy(key selector).window(TumblingEventTimeWindows.of(Time.seconds(5))).windowed transformation(window function);// 滚动 processing-time 窗口
input.keyBy(key selector).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).windowed transformation(window function);// 长度为一天的滚动 event-time 窗口 偏移量为 -8 小时。
input.keyBy(key selector).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))).windowed transformation(window function);
滑动窗口Sliding Windows
滑动窗口大小也是固定的但是窗口之间并不是首尾相接的而是重叠的。
DataStreamT input ...;// 滑动 event-time 窗口
input.keyBy(key selector).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).windowed transformation(window function);// 滑动 processing-time 窗口
input.keyBy(key selector).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).windowed transformation(window function);// 滑动 processing-time 窗口偏移量为 -8 小时
input.keyBy(key selector).window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))).windowed transformation(window function);
会话窗口Session Windows
会话窗口是基于“会话”session来对数据进行分组的会话窗口只能基于时间来定义。
DataStreamT input ...;// 设置了固定间隔的 event-time 会话窗口
input.keyBy(key selector).window(EventTimeSessionWindows.withGap(Time.minutes(10))).windowed transformation(window function);// 设置了动态间隔的 event-time 会话窗口
input.keyBy(key selector).window(EventTimeSessionWindows.withDynamicGap((element) - {// 决定并返回会话间隔})).windowed transformation(window function);// 设置了固定间隔的 processing-time session 窗口
input.keyBy(key selector).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))).windowed transformation(window function);// 设置了动态间隔的 processing-time 会话窗口
input.keyBy(key selector).window(ProcessingTimeSessionWindows.withDynamicGap((element) - {// 决定并返回会话间隔})).windowed transformation(window function);
全局窗口
这种窗口对全局有效会把相同的key的所有数据分配到同一个窗口中这种窗口没有结束时间默认不会触发计算如果希望对数据进行处理需要自定义“触发器”。
DataStreamT input ...;input.keyBy(key selector).window(GlobalWindows.create()).windowed transformation(window function);
计数窗口
计数窗口概念非常简单本身底层是基于全局窗口Global Window实现的。Flink为我们提供了非常方便的接口直接调用.countWindow()方法
滚动计数窗口
滚动计数窗口只需要传入一个长整型的参数size表示窗口的大小。
stream.keyBy(...).countWindow(10)滑动计数窗口
与滚动计数窗口类似不过需要在.countWindow()调用时传入两个参数size和slide前者表示窗口大小后者表示滑动步长。
stream.keyBy(...).countWindow(103)窗口函数Window Functions
定义了 window assigner 之后我们需要指定当窗口触发之后我们如何计算每个窗口中的数据 这就是 window function 的职责了 窗口函数有三种ReduceFunction、AggregateFunction 或 ProcessWindowFunction。
ReduceFunction
ReduceFunction 指定两条输入数据如何合并起来产生一条输出数据输入和输出数据的类型必须相同。 Flink 使用 ReduceFunction 对窗口中的数据进行增量聚合。
DataStreamTuple2String, Long input ...;input.keyBy(key selector).window(window assigner).reduce(new ReduceFunctionTuple2String, Long() {//v1 和v2是 2个相同类型的输入参数public Tuple2String, Long reduce(Tuple2String, Long v1, Tuple2String, Long v2) {return new Tuple2(v1.f0, v1.f1 v2.f1);}});AggregateFunction
ReduceFunction 是 AggregateFunction 的特殊情况。 AggregateFunction 接收三个类型输入数据的类型(IN)、累加器的类型ACC和输出数据的类型OUT。
/*** The accumulator is used to keep a running sum and a count. The {code getResult} method* computes the average.*/
private static class AverageAggregateimplements AggregateFunctionTuple2String, Long, Tuple2Long, Long, Double {Overridepublic Tuple2Long, Long createAccumulator() {return new Tuple2(0L, 0L);}Overridepublic Tuple2Long, Long add(Tuple2String, Long value, Tuple2Long, Long accumulator) {return new Tuple2(accumulator.f0 value.f1, accumulator.f1 1L);}Overridepublic Double getResult(Tuple2Long, Long accumulator) {return ((double) accumulator.f0) / accumulator.f1;}Overridepublic Tuple2Long, Long merge(Tuple2Long, Long a, Tuple2Long, Long b) {return new Tuple2(a.f0 b.f0, a.f1 b.f1);}
}DataStreamTuple2String, Long input ...;input.keyBy(key selector).window(window assigner).aggregate(new AverageAggregate());接口中有四个方法
createAccumulator()创建一个累加器这就是为聚合创建了一个初始状态每个聚合任务只会调用一次。add()将输入的元素添加到累加器中。getResult()从累加器中提取聚合的输出结果。merge()合并两个累加器并将合并后的状态作为一个累加器返回。
可以看到AggregateFunction的工作原理是首先调用createAccumulator()为任务初始化一个状态累加器而后每来一个数据就调用一次add()方法对数据进行聚合得到的结果保存在状态中等到了窗口需要输出时再调用getResult()方法得到计算结果。很明显与ReduceFunction相同AggregateFunction也是增量式的聚合而由于输入、中间状态、输出的类型可以不同使得应用更加灵活方便。
ProcessWindowFunction
ProcessWindowFunction 有能获取包含窗口内所有元素的 Iterable 以及用来获取时间和状态信息的 Context 对象比其他窗口函数更加灵活。 ProcessWindowFunction 的灵活性是以性能和资源消耗为代价的 因为窗口中的数据无法被增量聚合而需要在窗口触发前缓存所有数据。
public class WindowProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(127.0.0.1, 7777).map(new WaterSensorMapFunction());KeyedStreamWaterSensor, String keyedStream sensorDS.keyBy(WaterSensor::getId);WindowedStreamWaterSensor, String, TimeWindow sensorWS keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));sensorWS.process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String key, Context context, IterableWaterSensor elements, CollectorString out) {// 上下文可以拿到window对象还有其他东西侧输出流 等等long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count elements.spliterator().estimateSize();out.collect(key key 的窗口[ windowStart , windowEnd )包含 count 条数据 elements);}}).print();env.execute();}
}增量聚合和全窗口函数的结合使用
在实际应用中我们往往希望兼具这两者的优点把它们结合在一起使用。 我们之前在调用WindowedStream的.reduce()和.aggregate()方法时只是简单地直接传入了一个ReduceFunction或AggregateFunction进行增量聚合。除此之外其实还可以传入第二个参数一个全窗口函数可以是WindowFunction或者ProcessWindowFunction。
// ReduceFunction与WindowFunction结合
public R SingleOutputStreamOperatorR reduce(ReduceFunctionT reduceFunctionWindowFunctionTRKW function) // ReduceFunction与ProcessWindowFunction结合
public R SingleOutputStreamOperatorR reduce(ReduceFunctionT reduceFunctionProcessWindowFunctionTRKW function)// AggregateFunction与WindowFunction结合
public ACCVR SingleOutputStreamOperatorR aggregate(AggregateFunctionTACCV aggFunctionWindowFunctionVRKW windowFunction)// AggregateFunction与ProcessWindowFunction结合
public ACCVR SingleOutputStreamOperatorR aggregate(AggregateFunctionTACCV aggFunction,ProcessWindowFunctionVRKW windowFunction)