前言
这期分享windos的理解,只有这个理解清楚了,才能更好的根据场景选择合适的开窗处理。
一、window的基本概念
1.window是什么
2.window的分类
PS: 按key分组了用window构建多个window,未分组用windowAll(API后缀都带All) 区别示例:
3.window的生命周期
4.Window Assinger
5.Window Assinger分类(window小分类)
翻滚窗口 翻滚窗口的使用 滑动窗口 滑动窗口的使用 session窗口 session窗口的使用 PS: sessionWindow只能基于时间 global窗口
6.window盘点
示例比较,注意体会各个window的区别
7.预定义的keyed window
预定义的这些window可以替换.window()
8.预定义的Non-Keyed window
二、窗口函数
PS windowFunction/AllWindowFunction是早期版本一致遗留下来的,现在被ProcessWindowFunction/ProcessAllWindowFunction替换
1.ReduceFunction
2.AggregateFunction
3.FoldFunction
PS 已过时,官方已经不推荐用了
3.WindowFunction/AllWindowFunction
PS 我目前用的1.14.4已经标记过时了,官方也已经不推荐
4.ProcessWindowFunction/ProcessAllWindowFunction
这是新一代的窗口函数
4.新一代窗口函数混搭
三、触发器与驱逐器
1.什么是触发器
2.触发和清除
3.默认触发器
4.内置和自定义触发器
5.驱逐器的作用
6.内置驱逐器
四、延迟处理及窗口计算结果的使用
1.如何允许延迟
2.延迟数据的获取
3.晚点元素注意
4.window result的使用
5.水位线与窗口的交互
6.窗口估算注意事项
五、窗口函数示例
1.AggregateFunction
package spendreport.window;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TestAggFunctionOnWindow {
private static final Tuple3[] ENGLISH_TRANSCRIPT = new Tuple3[]{
Tuple3.of("class1", "张三", 100D),
Tuple3.of("class1", "李四", 78D),
Tuple3.of("class1", "王五", 99D),
Tuple3.of("class2", "赵六", 81D),
Tuple3.of("class2", "钱七", 59D),
Tuple3.of("class2", "马二", 97D),
};
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple3<String, String, Double>> input = env.fromElements(ENGLISH_TRANSCRIPT);
DataStream<Double> avgScore = input.keyBy(
new KeySelector<Tuple3<String, String, Double>, String>() {
@Override
public String getKey(Tuple3<String, String, Double> value)
throws Exception {
return value.f0;
}
}).countWindow(2).aggregate(new AverageAggregate());
avgScore.print();
env.execute();
}
private static class AverageAggregate implements
AggregateFunction<Tuple3<String, String, Double>, Tuple2<Double, Long>, Double> {
@Override
public Tuple2<Double, Long> createAccumulator() {
return new Tuple2<>(0D, 0L);
}
@Override
public Tuple2<Double, Long> add(Tuple3<String, String, Double> value,
Tuple2<Double, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f2, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2<Double, Long> accumulator) {
return accumulator.f0 / accumulator.f1;
}
@Override
public Tuple2<Double, Long> merge(Tuple2<Double, Long> acc1,
Tuple2<Double, Long> acc2) {
return new Tuple2<>(acc1.f0 + acc2.f0, acc1.f1 + acc2.f1);
}
}
}
2.ReduceFunction
package spendreport.window;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TestReduceFunctionOnWindow {
private static final Tuple3[] ENGLISH_TRANSCRIPT = new Tuple3[]{
Tuple3.of("class1", "张三", 100),
Tuple3.of("class1", "李四", 78),
Tuple3.of("class1", "王五", 99),
Tuple3.of("class2", "赵六", 81),
Tuple3.of("class2", "钱七", 59),
Tuple3.of("class2", "马二", 97),
};
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple3<String, String, Integer>> input = env.fromElements(ENGLISH_TRANSCRIPT);
DataStream<Tuple3<String, String, Integer>> totalPoints = input.keyBy(
new KeySelector<Tuple3<String, String, Integer>, String>() {
@Override
public String getKey(Tuple3<String, String, Integer> value)
throws Exception {
return value.f0;
}
}).countWindow(2).reduce(
new ReduceFunction<Tuple3<String, String, Integer>>() {
@Override
public Tuple3<String, String, Integer> reduce(
Tuple3<String, String, Integer> v1,
Tuple3<String, String, Integer> v2) throws Exception {
return new Tuple3<>(v1.f0, v1.f1, v1.f2 + v2.f2);
}
});
totalPoints.print();
env.execute();
}
}
3.ProcessWindowFunction
package spendreport.window;
import java.util.Iterator;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;
public class TestProcessWinFunctionOnWindow {
private static final Tuple3[] ENGLISH_TRANSCRIPT = new Tuple3[]{
Tuple3.of("class1", "张三", 100D),
Tuple3.of("class1", "李四", 78D),
Tuple3.of("class1", "王五", 99D),
Tuple3.of("class2", "赵六", 81D),
Tuple3.of("class2", "钱七", 59D),
Tuple3.of("class2", "马二", 97D),
};
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple3<String, String, Double>> input = env.fromElements(ENGLISH_TRANSCRIPT);
DataStream<Double> avgScore = input.keyBy(
new KeySelector<Tuple3<String, String, Double>, String>() {
@Override
public String getKey(Tuple3<String, String, Double> value)
throws Exception {
return value.f0;
}
}).countWindow(2).process(new MyProcessWindowFunction());
avgScore.print();
env.execute();
}
public static class MyProcessWindowFunction extends
ProcessWindowFunction<Tuple3<String, String, Double>, Double, String, GlobalWindow> {
@Override
public void process(String tuple,
Context context,
Iterable<Tuple3<String, String, Double>> iterable, Collector<Double> collector)
throws Exception {
Double sum = 0D;
Long count = 0L;
Iterator<Tuple3<String, String, Double>> it = iterable.iterator();
while (it.hasNext()) {
Tuple3<String, String, Double> tp = it.next();
sum += tp.f2;
count++;
}
Double outScore = sum / count;
collector.collect(outScore);
}
}
}
PS 这里例子MyProcessWindowFunction 的第3个参数KEY要与keyBy()的KeySelector返回return的一致。
总结
窗口函数多种写法,特别灵活,在业务场景中使用,先别慌写,理清楚。
|