Flink个人学习整理-CEP篇(十二)
FlinkCEP是在Flink实现的复杂事件处理库. 它可以让你在无界流中检测出特定的数据。 复杂事件可以定义为多个事件放在一起来处理。例如:连续两个失败事件 1、输入一个或多个事件构成的事件流 2、制定匹配规则 3、输出满足规则的复杂事件 处理事件的规则,被叫做“模式”(pattern) 一、个体模式 个体模式包含:单例模式和循环模式 单例模式:只能接收一个事件 循环模式:可以接收多个事件
start.times(3)
start.times(3).optional
start.times(1,3)
start.times(1,3).greedy
start.oneOrMore
start.timesOrMore(2).optional.greedy
注意:times() 默认使用宽松近邻模式
可以使用 .consecutive()
判断条件 .where() 对事件中的字段进行判断,决定是否接受,多个where相当于 and .or() 表示或相连 .until() 终止条件,清除状态,如果使用oneOrMore/oneOrMore.optional,建议使用
二、模式序列 1、严格近邻 .next 紧挨着
2、宽松近邻(只会匹配出最近的一条数据) .followedBy 在后面即可
3、非确定性宽松近邻(会将符合条件的全部匹配出来) .followedByAny
4、指定时间约束(与next 或者 followedBy 或者 followedByAny 组合使用) .within(Time.seconds(10))
5、不想让某个事件严格近邻 .notNext
6、不想让某个事件发生在两个事件之间 .notFollowedBy
注意: 1、序列需要以.begin()开始 2、模式序列不能以.notFollowedBy()结束 3、not类型的模式不能被.optional修饰
demo 1
Pattern<LoginEvent, LoginEvent> loginEventPattern = Pattern.<LoginEvent>begin("start")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent value) throws Exception {
return "fail".equals(value.getEventType());
}
})
.times(2)
.consecutive()
.within(Time.seconds(5));
PatternStream<LoginEvent> pattern = CEP.pattern(loginEventLongKeyedStream, loginEventPattern);
pattern.select(new PatternSelectFunction<LoginEvent, String>() {
@Override
public String select(Map<String, List<LoginEvent>> map) throws Exception {
LoginEvent start = map.get("start").get(0);
LoginEvent next = map.get("start").get(1);
return start.getUserId() + " 在 "+start.getEventTime()+" 到 "+next.getEventTime()+" 连续失败两次";
}
}).print();
demo 2
Pattern<OrderEvent, OrderEvent> orderEventPattern = Pattern.<OrderEvent>begin("start")
.where(new SimpleCondition<OrderEvent>() {
@Override
public boolean filter(OrderEvent value) throws Exception {
return "create".equals(value.getEventType());
}
})
.followedBy("follow")
.where(new SimpleCondition<OrderEvent>() {
@Override
public boolean filter(OrderEvent value) throws Exception {
return "pay".equals(value.getEventType());
}
}).within(Time.minutes(15));
PatternStream<OrderEvent> patternStream = CEP.pattern(orderEventLongKeyedStream, orderEventPattern);
SingleOutputStreamOperator<String> result = patternStream.select(
new OutputTag<String>("No Pay") {
},
new PatternTimeoutFunction<OrderEvent, String>() {
@Override
public String timeout(Map<String, List<OrderEvent>> map, long l) throws Exception {
OrderEvent start = map.get("start").get(0);
return start.getOrderId() + " 订单未正常支付 " + start.getEventTime() + " 到 " + l / 1000 + " 之内";
}
},
new PatternSelectFunction<OrderEvent, String>() {
@Override
public String select(Map<String, List<OrderEvent>> map) throws Exception {
OrderEvent start = map.get("start").get(0);
OrderEvent follow = map.get("follow").get(0);
return start.getOrderId() + " 订单正常支付 " + start.getEventTime() + " 到 " + follow.getEventTime() + " 时间内完成";
}
}
);
result.print();
result.getSideOutput(new OutputTag<String>("No Pay") {}).print("Time Out");
|