Flink程序中 使用定时器
有时候,我们在计算任务中需要使用到定时器来帮助我们处理业务,例如 订单的自动结算?自动好评? 定时收集?等等…
但需要注意的 我们无法为计算任务灵活的配置CRON表达式 ,仅仅只能指定触发的时刻。
一、什么样的Flink作业可以开启开启定时器
需要开启定时作业的JOB,必须是由KeyedProcessFunction 低阶函数进行数据处理,而非Window
我们可以在processElement 方法中 执行我们的数据处理逻辑以及开启定时器。
OnTimer 方法则是定时器触发时执行的具体的方法,
二、定时器功能展示
结果展示
下单时间与自动好评时间
对应订单好评时间到,检查是否评价与开启自动好评
三、逻辑实现
(1)自定义数据源
我这里是写死的,各位可根据自己的需求来实现RichSourceFunction 这个接口即可
package com.leilei;
import cn.hutool.core.util.RandomUtil;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
public class MyOrderSource extends RichSourceFunction<Order> {
private Boolean flag = true;
private final String[] products = new String[]{"黄焖鸡米饭", "北京烤鸭", "桥头排骨"};
private final String[] users = new String[]{"马邦德", "黄四郎", "张麻子"};
AtomicInteger num;
@Override
public void run(SourceContext<Order> ctx) throws Exception {
while (flag) {
Order order = Order.builder()
.product(products[RandomUtil.randomInt(3)])
.username(users[RandomUtil.randomInt(3)])
.orderId(UUID.randomUUID().toString().replace("-", "."))
.orderTime(System.currentTimeMillis())
.build();
Thread.sleep(5000);
num.incrementAndGet();
ctx.collect(order);
}
}
@Override
public void cancel() {
flag = false;
}
}
package com.leilei;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
@Data
@Builder
public class Order {
private String product;
private String username;
private String orderId;
private Long orderTime;
}
(2)自定义订单KEY生成器
package com.leilei;
import java.text.SimpleDateFormat;
public class KeyUtil {
public static String buildKey(Order order) {
String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(order.getOrderTime());
return order.getUsername()+"--" + order.getProduct()+"--" + order.getOrderId() + "--下单时间:" + date;
}
}
(3)ProcessFunction处理函数 逻辑处理与定时器功能
(1)定义低阶处理函数
我们需要继承KeyedProcessFunction 这个抽象类,因为我的计算任务会根据KEY进行分组
(2)处理函数中编写计算逻辑与定时器注册
我们的计算程序如果是ProcessFunction的话,来一个元素便会触发一次processElement 方法,其是真正的流式处理,且只能一个一个的处理,不像Window 一般,我们可自定义窗口的大小(window是流与批的桥梁)。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TAGcu2me-1629024341221)(C:\Users\leile\AppData\Roaming\Typora\typora-user-images\image-20210815175350234.png)]
value:便是我们的输入元素
ctx:是执行环境,我们可以在其中开启定时器,获取当前KEY,获取侧位输出(OutPutTag)等等一系列操作
out:则是输出元素结果搜集器
(1)注册定时器
定时器的触发时间 支持处理时间与事件时间…我们可以根据选择自行进行注册
(2)处理时间定时器与事件时间定时器区别?
处理时间:即flink算子处理数据的时间(随着处理数据(元素)而不断增加)
ex:现在有三个数据 来源顺序如下 A > B > C
C 进入了算子 且注册了一个(12:02:00的处理时间定时器),Flink作业所在的机器上时间为 12:00:00,那么现在Flink作业的处理时间则为12:00:00
B 进入了算子,Flink作业所在的机器上时间为 12:01:00,那么现在Flink作业的处理时间则为12:01:00
A 进入了算子,Flink作业所在的机器上时间为 12:02:00,那么现在Flink作业的处理时间则为12:02:00
作业在处理了A元素后,便会触发 C 注册的定时器(处理时间已经大于等于12:02:00)
事件时间则为数据本身携带的时间属性(是否会增大受来源数据的时间影响,事件时间的选择受有KEY无无KEY影响,具体请查看前边的TimeWindow篇)
ex:现在有三个数据 来源顺序如下 A > B > C 假设他们的数据进入Flink程序有点延迟,但他们各自包含了自己的时间属性 A(12:01:00) > B(12:01:00) > C(12:00:00)
C 进入了算子 并注册了一个(12:02:00的事件时间定时器),且C元素的时间成为了 Flink作业最新的事件时间
B 进入了算子,B元素的事件时间为12:01:00,且B元素的时间成为了 Flink作业最新的事件时间
A 进入了算子,但A元素的事件时间还是为12:01:00, 这个时间等于 Flink作业事件时间,那么此时Flink作业的事件时间仍为12:01:00
A计算完成后,C触发注册的定时器(12:02:00)仍不会触发,因为目前Flink作业中,事件时间才到(12:01:00),其会等待直到有一个事件时间大于等于了12:02:00的数据到来才会触发
(3)处理函数完整代码
package com.leilei;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
import java.util.Iterator;
import java.util.Map;
public class OrderSettlementProcess extends KeyedProcessFunction<String, Order, Object> {
private final Long overTime;
MapState<String, Long> productState;
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public OrderSettlementProcess(Long overTime) {
this.overTime = overTime;
}
@Override
public void processElement(Order currentOrder, Context ctx, Collector<Object> out) throws Exception {
long time = currentOrder.getOrderTime() + this.overTime;
ctx.timerService().registerProcessingTimeTimer(time);
productState.put(ctx.getCurrentKey(), time);
System.out.println(KeyUtil.buildKey(currentOrder) + " 订单过期时间为:" + time + " :" + df.format(time));
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
super.onTimer(timestamp, ctx, out);
System.out.println("定时任务执行:" + timestamp + ":" + ctx.getCurrentKey());
Iterator<Map.Entry<String, Long>> orderIterator = productState.iterator();
if (orderIterator.hasNext()) {
Map.Entry<String, Long> orderEntry = orderIterator.next();
String key = orderEntry.getKey();
Long expire = orderEntry.getValue();
if (!isEvaluation(key) && expire == timestamp) {
System.err.println(key + ">>>>> 超过订单未评价且超过最大评价时间,默认设置五星好评!");
} else {
System.out.println(key + "订单已评价!");
}
}
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
MapStateDescriptor<String, Long> mapStateDescriptor = new MapStateDescriptor<>("productState",
TypeInformation.of(String.class),
TypeInformation.of(Long.class));
productState = getRuntimeContext().getMapState(mapStateDescriptor);
}
public Boolean isEvaluation(String orderKey) {
return false;
}
}
(4)Flink定时器DEMO主启动类
package com.leilei;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkTimer {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Order> streamSource = env.addSource(new MyOrderSource());
DataStream<Object> stream = streamSource
.keyBy(KeyUtil::buildKey)
.process(new OrderSettlementProcess(120 * 1000L));
stream.printToErr();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
|