一、不分组窗口函数
public class WindowStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>("dwd", new SimpleStringSchema(), KafkaUtils.comsumerProps());
DataStreamSource<String> sourceStream = env.addSource(consumer);
SingleOutputStreamOperator<String> sourceWithWatermarkStream = sourceStream.assignTimestampsAndWatermarks(new EventTimeExtractor());
SingleOutputStreamOperator<Tuple2<String, Integer>> vodStream = sourceWithWatermarkStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String line) throws Exception {
JSONObject jn = JSON.parseObject(line);
return new Tuple2<String, Integer>(jn.getString("userid"), Integer.parseInt(jn.getString("seconds")));
}
});
// 全局窗口
// 滚动时间窗口
vodStream
.timeWindowAll(Time.seconds(5))
.sum(1)
.print();
// 滑动时间窗口
vodStream
.timeWindowAll(Time.seconds(5), Time.seconds(3))
.sum(1)
.print();
// 滚动计数窗口
vodStream
.countWindowAll(100)
.sum(1)
.print();
// 滑动计数窗口
vodStream
.countWindowAll(100, 80)
.sum(1)
.print();
env.execute();
}
}
二、分组窗口函数
public class WindowStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>("dwd", new SimpleStringSchema(), KafkaUtils.comsumerProps());
DataStreamSource<String> sourceStream = env.addSource(consumer);
SingleOutputStreamOperator<String> sourceWithWatermarkStream = sourceStream.assignTimestampsAndWatermarks(new EventTimeExtractor());
SingleOutputStreamOperator<Tuple2<String, Integer>> vodStream = sourceWithWatermarkStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String line) throws Exception {
JSONObject jn = JSON.parseObject(line);
return new Tuple2<String, Integer>(jn.getString("userid"), Integer.parseInt(jn.getString("seconds")));
}
});
// 分组窗口
// 滚动时间窗口
vodStream
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.print();
// 滑动时间窗口
vodStream
.keyBy(0)
.timeWindow(Time.seconds(5), Time.seconds(3))
.sum(1)
.print();
// 滚动计数窗口
vodStream
.keyBy(0)
.countWindow(100)
.sum(1)
.print();
// 滑动计数窗口
vodStream
.keyBy(0)
.countWindow(100, 80)
.sum(1)
.print();
env.execute();
}
}
三、窗口函数的聚合
1、普通聚合函数
????????例:sum、max
public class WindowStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>("dwd", new SimpleStringSchema(), KafkaUtils.comsumerProps());
DataStreamSource<String> sourceStream = env.addSource(consumer);
SingleOutputStreamOperator<String> sourceWithWatermarkStream = sourceStream.assignTimestampsAndWatermarks(new EventTimeExtractor());
SingleOutputStreamOperator<Tuple2<String, Integer>> vodStream = sourceWithWatermarkStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String line) throws Exception {
JSONObject jn = JSON.parseObject(line);
return new Tuple2<String, Integer>(jn.getString("userid"), Integer.parseInt(jn.getString("seconds")));
}
});
// 全局窗口
// 滚动时间窗口
vodStream
.timeWindowAll(Time.seconds(5))
.sum(1)
.print();
env.execute();
}
}
?
2、reduce
????????ReduceFunction
public class WindowStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>("dwd", new SimpleStringSchema(), KafkaUtils.comsumerProps());
DataStreamSource<String> sourceStream = env.addSource(consumer);
SingleOutputStreamOperator<String> sourceWithWatermarkStream = sourceStream.assignTimestampsAndWatermarks(new EventTimeExtractor());
SingleOutputStreamOperator<Tuple2<String, Integer>> vodStream = sourceWithWatermarkStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String line) throws Exception {
JSONObject jn = JSON.parseObject(line);
return new Tuple2<String, Integer>(jn.getString("userid"), Integer.parseInt(jn.getString("seconds")));
}
});
// reduce -> ReduceFunction
vodStream
.timeWindowAll(Time.seconds(5))
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
return new Tuple2<>("user", t1.f1 + t2.f1);
}
})
.print();
env.execute();
}
}
?
3、aggregate
public class WindowStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>("dwd", new SimpleStringSchema(), KafkaUtils.comsumerProps());
DataStreamSource<String> sourceStream = env.addSource(consumer);
SingleOutputStreamOperator<String> sourceWithWatermarkStream = sourceStream.assignTimestampsAndWatermarks(new EventTimeExtractor());
SingleOutputStreamOperator<Tuple2<String, Integer>> vodStream = sourceWithWatermarkStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String line) throws Exception {
JSONObject jn = JSON.parseObject(line);
return new Tuple2<String, Integer>(jn.getString("userid"), Integer.parseInt(jn.getString("seconds")));
}
});
// aggregate -> AggregateFunction
vodStream
.timeWindowAll(Time.seconds(5))
.aggregate(new AggregateFunction<Tuple2<String, Integer>, Integer, Integer>() {
// 这个函数一般在初始化时调用
@Override
public Integer createAccumulator() {
return 0;
}
// 当一个新元素流入时,将新元素与状态数据ACC合并,返回状态数据ACC
@Override
public Integer add(Tuple2<String, Integer> t1, Integer o) {
return o + t1.f1;
}
// 将两个ACC合并
@Override
public Integer getResult(Integer o) {
return o;
}
// 将中间数据转成结果数据
@Override
public Integer merge(Integer o, Integer acc1) {
return o + acc1;
}
})
.print();
env.execute();
}
}
4、process
public class WindowStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>("dwd", new SimpleStringSchema(), KafkaUtils.comsumerProps());
DataStreamSource<String> sourceStream = env.addSource(consumer);
SingleOutputStreamOperator<String> sourceWithWatermarkStream = sourceStream.assignTimestampsAndWatermarks(new EventTimeExtractor());
SingleOutputStreamOperator<Tuple2<String, Integer>> vodStream = sourceWithWatermarkStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String line) throws Exception {
JSONObject jn = JSON.parseObject(line);
return new Tuple2<String, Integer>(jn.getString("userid"), Integer.parseInt(jn.getString("seconds")));
}
});
// process -> ProcessAllWindowFunction
vodStream
.timeWindowAll(Time.seconds(5))
.process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Integer, TimeWindow>() {
// 对一个窗口内的元素进行处理,窗口内的元素缓存在Iterable<IN>,进行处理后输出到Collector<OUT>中,我们可以输出一到多个结果
@Override
public void process(Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<Integer> collector) throws Exception {
int result = 0;
for (Tuple2<String, Integer> t : iterable) {
result = result + t.f1;
}
collector.collect(result);
}
})
.print();
env.execute();
}
}
?
?
public class WindowStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>("dwd", new SimpleStringSchema(), KafkaUtils.comsumerProps());
DataStreamSource<String> sourceStream = env.addSource(consumer);
SingleOutputStreamOperator<String> sourceWithWatermarkStream = sourceStream.assignTimestampsAndWatermarks(new EventTimeExtractor());
SingleOutputStreamOperator<Tuple2<String, Integer>> vodStream = sourceWithWatermarkStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String line) throws Exception {
JSONObject jn = JSON.parseObject(line);
return new Tuple2<String, Integer>(jn.getString("vodid"), Integer.parseInt(jn.getString("seconds")));
}
});
// 分组窗口
// 滚动时间窗口
vodStream
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.print();
vodStream
.keyBy(0)
.timeWindow(Time.seconds(5))
.process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
Map<String, Integer> result = new HashMap<>();
for (Tuple2<String, Integer> t : iterable) {
Integer a = 0;
if ((a = result.get(t.f0)) != null) {
result.put(t.f0, a + t.f1);
} else {
result.put(t.f0, 1);
}
}
for (Map.Entry<String, Integer> entry : result.entrySet()) {
collector.collect(new Tuple2<>(entry.getKey(), entry.getValue()));
}
}
})
.print();
env.execute();
}
}
?
5、apply
public class WindowStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>("dwd", new SimpleStringSchema(), KafkaUtils.comsumerProps());
DataStreamSource<String> sourceStream = env.addSource(consumer);
SingleOutputStreamOperator<String> sourceWithWatermarkStream = sourceStream.assignTimestampsAndWatermarks(new EventTimeExtractor());
SingleOutputStreamOperator<Tuple2<String, Integer>> vodStream = sourceWithWatermarkStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String line) throws Exception {
JSONObject jn = JSON.parseObject(line);
return new Tuple2<String, Integer>(jn.getString("vodid"), Integer.parseInt(jn.getString("seconds")));
}
});
vodStream
.keyBy(0)
.timeWindow(Time.seconds(5))
.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
Map<String, Integer> result = new HashMap<>();
for (Tuple2<String, Integer> t : iterable) {
Integer a = 0;
if ((a = result.get(t.f0)) != null) {
result.put(t.f0, a + t.f1);
} else {
result.put(t.f0, 1);
}
}
for (Map.Entry<String, Integer> entry : result.entrySet()) {
collector.collect(new Tuple2<>(entry.getKey(), entry.getValue()));
}
}
})
.print();
env.execute();
}
}
|