?1、ProcessFunction api实现定时器
package com.atguigu.FProcessFunctionApi_api;
import com.atguigu.Zbeans.SensorReading;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
/**
* 本类借助于processfunction api的定时器功能完成一个"监控温度连续十秒上升就报警"的小案例。
*
* 补充ProcessFunction api的基础知识:
* ProessFunction api常用来分流、获取时间戳、watermark和注册定时任务。
* 用一个DataStream对象调用process方法,方法内部传入对应的匿名ProcessFunction类即可。
* ProcessFunction类都继承了富函数,都可以访问上下文。
* ProcessFunction类可细分为八个子类:
* ProcessFunction、KeyedProcessFunction、CoProcessFunction、
* BroadcastProcessFunction、KeyedBroadcastProcessFunction、
* ProcessWindowFunciton、ProcessAllWindowFunciton。
* 其中KeyedProcessFunction用的最多,以下介绍大概用法:
* datastream.process(new KeyedProcessFunction(){必须实现processElement方法})
* 在processElement中可以获取到上下文、注册定时任务、开启侧道输出等。
* KeyedProcessFunction继承了富函数,所以也可以访问到上下文。
* 关于定时器:
* 注册:context.timerService().registerProcessingTimeTimer(ts); 在processElement中注册定时器,传入处理/事件时间戳。
* 删除:context.timerService().deleteEventTimeTimer(timerTS); timerTSState.clear();
* 定时器等待触发过程中不再满足触发条件,应删除定时器并清空定时器状态。定时器触发完毕,应清空定时器状态。
* 状态:定时器的状态保存的是设置定时器时的时间戳,使用ValueState保存下来,一可以避免后续删除定时器无法获取到时间戳的问题,
* 二可以通过状态的更新来设置新的定时器。
* onTimer方法:可以设置定时器触发时输出的信息,并且在这个方法中应清楚定时器的状态,因为定时器已经触发。
* open方法:完成对于状态的初始化。
* close方法:对一些状态的关闭、资源的清理。
*/
public class AProcessFunctionTest {
public static void main(String[] args) throws Exception {
//创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//读取数据并包装成pojo
DataStreamSource<String> inputStream = env. socketTextStream("localhost",7777);
DataStream<SensorReading> mapStream = inputStream.map(line -> {
String[] splits = line.split(",");
return new SensorReading(new String(splits[0]), new Long(splits[1]), new Double(splits[2]));
});
//处理
mapStream.keyBy(SensorReading::getId)
.process(new MyKeyedProcessFunction(10)) //传入定时器触发时间
.print();
//执行
env.execute("测试ProcessFunction api 操作");
}
/**
* 泛型一:分组后key的类型 泛型二:输入类型 泛型三:输出类型
* KeyedProcessFunction必须重写processElement方法
*
*/
public static class MyKeyedProcessFunction extends KeyedProcessFunction<String, SensorReading, String> {
private Integer delayTime; //定时器触发间隔
private ValueState<Double> lastTempState;//上一次的温度
private ValueState<Long> timerTSState;//定时器时间戳
public MyKeyedProcessFunction(Integer delayTime) { //接收定时器触发时间
this.delayTime = delayTime;
}
/**
* 第一个执行的方法,富函数初始化
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
//给lastTemp赋值,第一条数据来临之前初值为null
lastTempState =getRuntimeContext().getState(new ValueStateDescriptor<Double>("last_temp",Double.class));
//给timerTimeStamp赋值,第一条数据来临之前初值为null
timerTSState =getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer_timestamp",Long.class));
}
@Override
public void processElement(SensorReading sen, Context context, Collector<String> collector) throws Exception {
//取出状态的值
Double lastTemp = lastTempState.value();
Long timerTS = timerTSState.value();
/*
如果是第一条数据,注册定时器
如果不是第一条数据,但
温度上升且没有定时器的时候,则设置10秒后的定时器,开始等待
*/
if((lastTemp == null) || (sen.getTemperature() >= lastTemp && timerTS == null )){
//注册定时器并更新定时器状态
long ts = context.timerService().currentProcessingTime() + delayTime * 1000L;
context.timerService().registerProcessingTimeTimer(ts);
timerTSState.update(ts);
}
//如果温度下降并且有定时器的时候,删除定时器并且清空状态
if((lastTemp != null) && (sen.getTemperature() < lastTemp) && (timerTS != null)){
context.timerService().deleteEventTimeTimer(timerTS);
timerTSState.clear();
}
//更新温度状态
lastTempState.update(sen.getTemperature());
// collector.collect(sen.getId());
// //context可以获取到的信息
// context.timestamp(); //当前的时间戳
// context.getCurrentKey();//当前key
// //context.output(new OutputTag<SensorReading>(sen.getId()));
// context.timerService().currentProcessingTime();//获取处理时间
// context.timerService().currentWatermark();//获取事件时间
// context.timerService().registerProcessingTimeTimer(context.timerService().currentProcessingTime()+delayTime*1000L);//按处理时间设置定时器
// context.timerService().registerEventTimeTimer((sen.getTimestamp()+10)*1000);//按事件时间注册定时器
// context.timerService().deleteProcessingTimeTimer(设置定时器用的时间戳);//删除定时器
// context.timerService().deleteEventTimeTimer((设置定时器用的时间戳));//删除定时器
}
/**
*配置定时器触发时输出的信息或者规则,同时清空定时器状态
* @param timestamp 定时器触发的时间
* @param ctx 上下文
* @param out 输出收集器
* @throws Exception
*/
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
out.collect("warning:传感器"+ ctx.getCurrentKey() + "温度值连续" + delayTime +"秒上升");
timerTSState.clear();
}
/**
* 清空温度
* @throws Exception
*/
@Override
public void close() throws Exception {
lastTempState.clear();
}
}
}
2、?ProcessFunction api实现侧输出流
package com.atguigu.FProcessFunctionApi_api;
import com.atguigu.Zbeans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* 本类借助ProcessFunction来实现一个侧输出流
* 1、初始化OutputTag对象的时候应初始化它的子类,防止泛型擦除。
* 2、process操作后直接返回类型,便于使用SingleOutputStreamOperator类型调用getSideOutput方法。
* 3、OutputTag对象和分类临界值应传入ProcessFunction中。
*/
public class BSideOutputTest {
public static void main(String[] args) throws Exception {
//创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//读取数据并包装成pojo
DataStreamSource<String> inputStream = env. socketTextStream("localhost",7777);
DataStream<SensorReading> mapStream = inputStream.map(line -> {
String[] splits = line.split(",");
return new SensorReading(new String(splits[0]), new Long(splits[1]), new Double(splits[2]));
});
/**
* 定义outputtag,将低温流作为侧输出流,侧输出流必须传入ProcessFunction函数
* outputTag定义为子类时为了防止泛型擦除
*/
OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("low_temp") {};
SingleOutputStreamOperator<SensorReading> highTempStream = mapStream.process(new MySideOutput<SensorReading, SensorReading>(outputTag, 40.0));
//输出并执行
highTempStream.print("不低于40.0的都是高温流");
highTempStream.getSideOutput(outputTag).print("低于40.0的都是低温流");
env.execute("ProcessFunction api实现侧道输出机制");
}
public static class MySideOutput<S, S1> extends ProcessFunction<SensorReading,SensorReading>{
private OutputTag<SensorReading> outputTag; //侧输出流
private Double temp;//temp为高温流低温流分界线
public MySideOutput(OutputTag<SensorReading> outputTag, Double temp) {
this.outputTag = outputTag;
this.temp = temp;
}
@Override
public void processElement(SensorReading sen, Context con, Collector<SensorReading> col) throws Exception {
if(sen.getTemperature()>=temp)
col.collect(sen); //大于指定温度,正常输出
else
con.output(outputTag,sen); //小于指定温度,加入侧输出流输出
}
}
}
|