一、简介
? ? ??滚动窗口将每个元素分配给指定窗口大小的窗口。滚动窗口具有固定大小并且不重叠。例如,如果指定一个大小为 5 分钟的滚动窗口,则将评估当前窗口,并每隔五分钟启动一个新窗口,我们实现对应的例子
? ? ?Flink(四) :窗口简介_在前进的路上-CSDN博客
二、代码
? ? ?
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.time.Duration;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
public class WindowsTumblingWindowsTest {
public static void main(String[] args) {
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties producerProperties = new Properties();
producerProperties.setProperty("bootstrap.servers", "127.0.0.1:9092");
producerProperties.setProperty("group.id", "test");
producerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
producerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("test_window", new SimpleStringSchema(), producerProperties);
//Flink从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数
consumer.setStartFromGroupOffsets();
//添加数据源
DataStreamSource<String> sourceStream = env.addSource(consumer);
//指定事件时间字段
DataStream<Map<String, Object>> dataStream = sourceStream.map(new MapFunction<String, Map<String, Object>>() {
@Override
public Map<String, Object> map(String value) throws Exception {
return (Map)JSON.parse(value);
}
}) .assignTimestampsAndWatermarks(
WatermarkStrategy.<Map<String, Object>>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<Map<String, Object>>() {
@Override
public long extractTimestamp(Map<String, Object> map, long recordTimestamp) {
return Long.parseLong(map.get("time").toString());
}
})
);
//指定key
DataStream windowDataStream = dataStream.keyBy(new KeySelector<Map<String, Object>, Tuple1<String>>() {
@Override
public Tuple1<String> getKey(Map<String, Object> value) throws Exception {
return Tuple1.of(value.get("key").toString());
}
})
.window(TumblingEventTimeWindows.of(Time.seconds(5))) //指定窗口时间5秒
.process(new ProcessWindowFunction<Map<String, Object>, Tuple4<String, Integer, Long, Long>, Tuple1<String>, TimeWindow>() {
@Override
public void process(Tuple1<String> stringTuple1, Context context, Iterable<Map<String, Object>> elements, Collector<Tuple4<String, Integer, Long, Long>> out) throws Exception {
Tuple4 tp4 = new Tuple4();
int count=0;
for (Map<String, Object> map : elements) {
count=count+1;
tp4.f0 = map.get("key").toString();
}
tp4.f1 = count;
tp4.f2 = context.window().getStart();
tp4.f3 = context.window().getEnd();
out.collect(tp4);
}
});
windowDataStream.print();
env.execute("test_windows");
}catch (Exception e){
e.printStackTrace();
}
}
}
说明
?1、设置水位线
? ?WatermarkStrategy<Map<String,Object>>forBoundedOutOfOrderness(Duration.ofSeconds(2))?
2、TumblingEventTimeWindows窗口时间
? ? TumblingEventTimeWindows.of(Time.seconds(5)
三、执行结果
发送数据 ??
{"key":"002","time":1642263310000} {"key":"002","time":1642263311000} {"key":"002","time":1642263313000} {"key":"002","time":1642263315000} {"key":"002","time":1642263316000} {"key":"002","time":1642263317000} {"key":"002","time":1642263318000} {"key":"002","time":1642263319000} {"key":"002","time":1642263320000} ??
窗口统计的执行结果
-
7> (002,3,1642263310000,1642263315000) -
7> (002,5,1642263315000,1642263320000)
|