public class DelayKeyStreamProcess extends KeyedProcessFunction<Integer, Iterable<HashMap<String, Object>>, Iterable<HashMap<String, Object>>> {
private transient MapState<Long, List<Iterable<HashMap<String, Object>>>> mapState;
/**
* 延迟十分钟,十分钟应该是够了
*/
private final Long delayTime = 1000 * 60 * 10L;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 设置生命有效期为两天
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(12))
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
MapStateDescriptor<Long, List<Iterable<HashMap<String, Object>>>> cacheStorage = new MapStateDescriptor<Long, List<Iterable<HashMap<String, Object>>>>("CacheStorage", TypeInformation.of(new TypeHint<Long>() {
@Override
public TypeInformation<Long> getTypeInfo() {
return super.getTypeInfo();
}
}), TypeInformation.of(new TypeHint<List<Iterable<HashMap<String, Object>>>>() {
@Override
public TypeInformation<List<Iterable<HashMap<String, Object>>>> getTypeInfo() {
return super.getTypeInfo();
}
}));
cacheStorage.enableTimeToLive(ttlConfig);
mapState = getRuntimeContext().getMapState(cacheStorage);
}
@Override
public void processElement(Iterable<HashMap<String, Object>> value, Context ctx, Collector<Iterable<HashMap<String, Object>>> out) throws Exception {
long cacheTime = System.currentTimeMillis() + delayTime;
ctx.timerService().registerProcessingTimeTimer(cacheTime);
List<Iterable<HashMap<String, Object>>> cacheIterables = mapState.get(cacheTime);
if (cacheIterables == null) {
cacheIterables = new ArrayList<>();
}
ArrayList<HashMap<String, Object>> hashMaps = new ArrayList<>();
Iterator<HashMap<String, Object>> iterator = value.iterator();
while (iterator.hasNext()) {
HashMap<String, Object> next = iterator.next();
HashMap<String, Object> deepCopyData = new HashMap<>();
deepCopyData.putAll(next);
hashMaps.add(deepCopyData);
}
cacheIterables.add(hashMaps);
mapState.put(cacheTime, cacheIterables);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Iterable<HashMap<String, Object>>> out) throws Exception {
super.onTimer(timestamp, ctx, out);
List<Iterable<HashMap<String, Object>>> cacheIterables = mapState.get(timestamp);
for (Iterable<HashMap<String, Object>> cacheIterable : cacheIterables) {
out.collect(cacheIterable);
}
mapState.remove(timestamp);
}
}
整体思路:存到状态中去,同时注册定时器,用定时器捞出来。
|