首先 flink 的窗口分配是发生在 StreamTask 初始化的过程中。 核心方法是 TumblingProcessingTimeWindows.assignWindows(…)
@Override
public Collection<TimeWindow> assignWindows(
Object element, long timestamp, WindowAssignerContext context) {
final long now = context.getCurrentProcessingTime();
if (staggerOffset == null) {
staggerOffset =
windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
}
long start =
TimeWindow.getWindowStartWithOffset(
now, (globalOffset + staggerOffset) % size, size);
return Collections.singletonList(new TimeWindow(start, start + size));
}
可以通过这个方法往上点,会发现他是在StreamTask初始化的时候触发的。 这个方法调用了一个很重要的方法来计算窗口开始时间:TimeWindow.getWindowStartWithOffset(…)
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
注释里有我的简单总结,供参考。
|