【时间】2022.06.14 周二
【题目】【Flink入门(7)】Flink的ProcessFunction API(底层API)
本专栏是尚硅谷Flink课程的笔记与思维导图。
目录
引言
0.概述
1.KeyedProcessFunction
示例代码
2 TimerService和定时器(Timers)
示例代码
3 侧输出流(SideOutput)
4 CoProcessFunction
总思维导图
引言
本节主要介绍flink中的ProcessFunction API(底层API),主要是KeyedProcessFunction的基本使用和示例代码。
0.概述
1.KeyedProcessFunction
?
示例代码
设置一个定时器:在获取数据后第5s给出提示信息。
关键是实现processElement()和onTimer()方法。
package processfunction;
import apitest.beans.SensorReading;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class ProcessTest1_KeyedProcessFunction {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// socket文本流
DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
// 转换成SensorReading类型
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 测试KeyedProcessFunction,先分组然后自定义处理
dataStream.keyBy("id")
.process( new MyProcess() )
.print();
env.execute();
}
// 实现自定义的处理函数
public static class MyProcess extends KeyedProcessFunction<Tuple, SensorReading, Integer> {
ValueState<Long> tsTimerState;
@Override
public void open(Configuration parameters) throws Exception {
//tsTimerState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("ts-timer", Long.class));
}
@Override
public void processElement(SensorReading value, Context ctx, Collector<Integer> out) throws Exception {
out.collect(value.getId().length());
// context
// Timestamp of the element currently being processed or timestamp of a firing timer.
//ctx.timestamp();
// Get key of the element being processed.
//ctx.getCurrentKey();
// ctx.output();
//ctx.timerService().currentProcessingTime();
//ctx.timerService().currentWatermark();
// 在处理时间的5秒延迟后触发
ctx.timerService().registerProcessingTimeTimer( ctx.timerService().currentProcessingTime() + 5000L);
//tsTimerState.update(ctx.timerService().currentProcessingTime() + 5000L);
// ctx.timerService().registerEventTimeTimer((value.getTimestamp() + 10) * 1000L);
// 删除指定时间触发的定时器
//ctx.timerService().deleteProcessingTimeTimer(tsTimerState.value());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
System.out.println(timestamp + " 定时器触发");
//ctx.getCurrentKey();
//ctx.output();
//ctx.timeDomain();
}
@Override
public void close() throws Exception {
//tsTimerState.clear();
}
}
}
启动本地socket
nc -lk 7777
输入
sensor_1,1547718207,36.3
输出
8
1612283803911 定时器触发
2 TimerService和定时器(Timers)
?
示例代码
需求:监控温度传感器的温度值,如果温度值在10 秒钟之内(processing time)连续上升,则报警。
思路:在第一个温度上升的节点设置一个10s的定时器,后面如果温度下降就删去定时器(不触发)。
package processfunction;
import apitest.beans.SensorReading;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class ProcessTest2_ApplicationCase {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度为1
env.setParallelism(1);
// 从socket中获取数据
DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
// 转换数据为SensorReading类型
DataStream<SensorReading> sensorReadingStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 如果存在连续10s内温度持续上升的情况,则报警
sensorReadingStream.keyBy(SensorReading::getId)
.process(new TempConsIncreWarning(Time.seconds(10).toMilliseconds()))
.print();
env.execute();
}
// 如果存在连续10s内温度持续上升的情况,则报警
public static class TempConsIncreWarning extends KeyedProcessFunction<String, SensorReading, String> {
public TempConsIncreWarning(Long interval) {
this.interval = interval;
}
// 报警的时间间隔(如果在interval时间内温度持续上升,则报警)
private Long interval;
// 上一个温度值
private ValueState<Double> lastTemperature;
// 最近一次定时器的触发时间(报警时间)
private ValueState<Long> recentTimerTimeStamp;
@Override
public void open(Configuration parameters) throws Exception {
lastTemperature = getRuntimeContext().getState(new ValueStateDescriptor<Double>("lastTemperature", Double.class));
recentTimerTimeStamp = getRuntimeContext().getState(new ValueStateDescriptor<Long>("recentTimerTimeStamp", Long.class));
}
@Override
public void close() throws Exception {
lastTemperature.clear();
recentTimerTimeStamp.clear();
}
@Override
public void processElement(SensorReading value, Context ctx, Collector<String> out) throws Exception {
// 当前温度值
double curTemp = value.getTemperature();
// 上一次温度(没有则设置为当前温度)
double lastTemp = lastTemperature.value() != null ? lastTemperature.value() : curTemp;
// 计时器状态值(时间戳),后面通过这个state判断是否已经设置了定时器
Long timerTimestamp = recentTimerTimeStamp.value();
// 如果 当前温度 > 上次温度 并且 没有设置报警计时器,则设置
if (curTemp > lastTemp && null == timerTimestamp) {
long warningTimestamp = ctx.timerService().currentProcessingTime() + interval;
ctx.timerService().registerProcessingTimeTimer(warningTimestamp);
recentTimerTimeStamp.update(warningTimestamp);
}
// 如果 当前温度 < 上次温度,且 设置了报警计时器,则清空计时器
else if (curTemp <= lastTemp && timerTimestamp != null) {
ctx.timerService().deleteProcessingTimeTimer(timerTimestamp);
recentTimerTimeStamp.clear();//timerTimestamp为null
}
// 更新保存的温度值
lastTemperature.update(curTemp);
}
// 定时器任务
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 触发报警,并且清除 定时器状态值
out.collect("传感器" + ctx.getCurrentKey() + "温度值连续" + interval + "ms上升");
recentTimerTimeStamp.clear();
}
}
}
-
启动本地socket,之后输入数据 nc -lk 7777 -
输入 sensor_1,1547718199,35.8
sensor_1,1547718199,34.1
sensor_1,1547718199,34.2
sensor_1,1547718199,35.1
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_10,1547718205,39 ?
sensor_6,1547718201,18 ?
sensor_7,1547718202,9.1 - 输出
传感器sensor_1温度值连续10000ms上升
传感器sensor_10温度值连续10000ms上升
传感器sensor_6温度值连续10000ms上升
传感器sensor_7温度值连续10000ms上升
3 侧输出流(SideOutput)
?
需求:场景:温度>=30放入高温流输出,反之放入低温流输出
package processfunction;
import apitest.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class ProcessTest3_SideOuptCase {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度 = 1
env.setParallelism(1);
// 从本地socket读取数据
DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
// 转换成SensorReading类型
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 定义一个OutputTag,用来表示侧输出流低温流
// An OutputTag must always be an anonymous inner class
// so that Flink can derive a TypeInformation for the generic type parameter.
OutputTag<SensorReading> lowTempTag = new OutputTag<SensorReading>("lowTemp"){};
// 测试ProcessFunction,自定义侧输出流实现分流操作
SingleOutputStreamOperator<SensorReading> highTempStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>() {
@Override
public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {
// 判断温度,大于30度,高温流输出到主流;小于低温流输出到侧输出流
if (value.getTemperature() > 30) {
out.collect(value);
} else {
ctx.output(lowTempTag, value);
}
}
});
highTempStream.print("high-temp");
highTempStream.getSideOutput(lowTempTag).print("low-temp");
env.execute();
}
}
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
high-temp> SensorReading{id='sensor_1', timestamp=1547718199, temperature=35.8}
low-temp> SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
low-temp> SensorReading{id='sensor_7', timestamp=1547718202, temperature=6.7}
high-temp> SensorReading{id='sensor_10', timestamp=1547718205, temperature=38.1}
4 CoProcessFunction
?
使用方法:stream1.connect(stream2).process(getCoProcessFunctionInstance())
?
具体见:Flink处理函数实战之五:CoProcessFunction(双流处理)_程序员欣宸的博客-CSDN博客
总思维导图
?
?
|