????声明: 1. 本文为我的个人复习总结, 并非那种从零基础开始普及知识?内容详细全面, 言辞官方的文章 ??????????????2. 由于是个人总结, 所以用最精简的话语来写文章 ??????????????3. 若有错误不当之处, 请指出
侧输出流(SideOutput)
即分支流, 可以用来接收迟到数据, 也可以用来将数据分类成多个支流
对于滑动窗口, 有很多窗口重叠, 当迟到数据被所有窗口 都不接收时, 它才会进入侧输出流
只有Process这种最底层的API, 才能通过环境上下文 去使用侧输出流
案例: 将温度值低于30度的数据输出到 SideOutput
final OutputTag<SensorReading> lowTempTag = new OutputTag<SensorReading>("lowTemp") { };
SingleOutputStreamOperator<SensorReading> highTempStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>( ) {
@Override
public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) {
if (value.getTemperature( ) < 30) {
ctx.output(lowTempTag, value);
} else {
out.collect(value);
}
}
});
DataStream<SensorReading> lowTempStream = highTempStream.getSideOutput(lowTempTag);
highTempStream.print("high");
lowTempStream.print("low");
8种ProcessAPI:
-
ProcessFunction -
KeyedProcessFunction 得先keyBy, 会处理流的每一个元素, 以out.collect(xxx)的方式输出任意多个元素
-
·processElement(I value, Context ctx, Collector<O> out) ctx 可以
-
访问元素的时间戳 -
访问元素的key -
访问TimerService(ctx.timerService( ))
TimerService:
方法:
- EventTime相关
- long currentWatermark( ) 返回当前数据的事件时间
- void registerEventTimeTimer(long timestamp) 注册当前key的定时器
- void deleteEventTimeTimer(long timestamp) 删除定时器, 如果没有则不执行
- ProcessingTime相关
- long currentProcessingTime( ) 返回当前数据的处理时间
- void registerProcessingTimeTimer(long timestamp) 注册当前key的定时器
- void deleteProcessingTimeTimer(long timestamp) 删除定时器, 如果没有则不执行
-
当定时器Timer触发后, 会执行回调函数onTimer( ) -
若注册窗口关闭时启动的定时器, 最好在WindowEndTime的基础上延迟1s; 因为到了临界点, 既要触发窗口计算, 又要触发定时器; 定时器任务又依赖于先窗口计算完毕, 所以给个1s的延迟较好
案例需求: 如果温度值在10秒钟之内(ProcessingTime)连续上升, 则报警
public class TempIncreaseWarning extends KeyedProcessFunction<String, SensorReading, String> {
private Integer interval;
public TempIncreaseWarning(Integer interval) {
this.interval = interval;
}
private ValueState<Double> lastTempState;
private ValueState<Long> timerTsState;
@Override
public void open(Configuration parameters) throws Exception {
lastTempState = getRuntimeContext( ).getState(new ValueStateDescriptor<Double>("last-temp", Double.class, Double.MIN_VALUE));
timerTsState = getRuntimeContext( ).getState(new ValueStateDescriptor<Long>("timer-ts", Long.class));
}
@Override
public void processElement(SensorReading value, Context ctx, Collector<String> out) throws Exception {
Double lastTemp = lastTempState.value( );
Long timerTs = timerTsState.value( );
lastTempState.update(value.getTemperature( ));
if (value.getTemperature( ) > lastTemp && timerTs == null) {
long ts = ctx.timerService( ).currentProcessingTime( ) + interval * 1000L;
ctx.timerService( ).registerProcessingTimeTimer(ts);
timerTsState.update(ts);
}
else if (value.getTemperature( ) <= lastTemp && timerTs != null) {
ctx.timerService( ).deleteProcessingTimeTimer(timerTs);
timerTsState.clear( );
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) {
out.collect("传感器" + ctx.getCurrentKey( ) + "的温度连续" + interval + "秒上升");
timerTsState.clear( );
}
}
-
将数据输出到侧输出流 -
.onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) 是一个回调函数,当之前注册的定时器触发时调用。 ? timestamp 是定时器所设定的触发运行 的时间戳 ? 如果注册一个已经过期的时间, 那么当再次输入数据时, 它才会触发定时器 -
CoProcessFunction connect后的流再.process 有processElement1( ) 和 processElement2( ) -
ProcessJoinFunction -
BroadcastProcessFunction A流有1个分区, B流有4个分区, B流要用到A流的数据, 所以需要将A流1个分区的数据广播到B流的4个分区 广播后再进行process处理 -
KeyedBroadcastProcessFunction -
ProcessWindowFunction 如 .aggregate(AggregateFunction<IN, ACC, OUT>aggFunction,ProcessWindowFunction<IN, OUT, KEY, W> windowFunction) -
ProcessAllWindowFunction
|