Flink开发-会话窗口SessionWindows
会话窗口是按照时间间隔划分窗口的,当超过指定的时间间隔,就会划分一个新的窗口。会话窗口没有固定的起始时间和结束时间,窗口中的数据也不会重叠。会话窗口可以指定一个固定的时间间隔,也可以根据数据中的信息传入一个函数计算出一个动态变化的时间间隔。
1.Non-Keyed Session Windows
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketStream = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = socketStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
return Tuple2.of(s, 1);
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = wordAndOne.windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(10))).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> s1, Tuple2<String, Integer> s2) throws Exception {
s1.f1 = s1.f1 + s2.f1;
return s1;
}
});
reduce.print();
env.execute("");
}
输入内容:
C:\Users\zhibai>nc -lp 8888
a
b
c
d
e
a
b
c
输出结果:
6> (a,5)
7> (a,3)
2.Keyed Session Windows
2.1 固定时间间隔
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketStream = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = socketStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
String[] fields = s.split(" ");
return Tuple2.of(fields[0], Integer.parseInt(fields[1]));
}
});
KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> s) throws Exception {
return s.f0;
}
});
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> processingwindow = keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)));
processingwindow.sum(1).print();
env.execute("");
}
输入内容:
C:\Users\zhibai>nc -lp 8888
a 1
a 1
b 1
b 1
b 1
b 1
b 1
输出结果:
6> (a,2)
2> (b,5)
2.2 动态时间间隔
从数据中提取字段生成时间间隔。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketStream = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = socketStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
String[] fields = s.split(" ");
return Tuple2.of(fields[0], Integer.parseInt(fields[1]));
}
});
KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> s) throws Exception {
return s.f0;
}
});
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> processingwindow = keyedStream.window(ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<Tuple2<String, Integer>>() {
@Override
public long extract(Tuple2<String, Integer> element) {
return element.f1 * 1000;
}
}));
processingwindow.sum(1).print();
env.execute("");
}
输入内容:
C:\Users\zhibai>nc -lp 8888
a 10
a 1
a 1
b 1
c 1
输出结果:
2> (b,1)
6> (a,12)
4> (c,1)
|