IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink个人学习整理-CEP篇(十二) -> 正文阅读

[大数据]Flink个人学习整理-CEP篇(十二)

Flink个人学习整理-CEP篇(十二)

FlinkCEP是在Flink实现的复杂事件处理库. 它可以让你在无界流中检测出特定的数据。
复杂事件可以定义为多个事件放在一起来处理。例如:连续两个失败事件
1、输入一个或多个事件构成的事件流
2、制定匹配规则
3、输出满足规则的复杂事件
处理事件的规则,被叫做“模式”(pattern)
一、个体模式
个体模式包含:单例模式和循环模式
单例模式:只能接收一个事件
循环模式:可以接收多个事件

//	量词
start.times(3)  // 匹配出现3次
start.times(3).optional	//	匹配出现0次,或3次
start.times(1,3)	//	匹配出现1,2,3次
start.times(1,3).greedy	//	匹配出现1,2,3次,并且尽可能多的重复匹配
start.oneOrMore	//	匹配出现1次或多次
start.timesOrMore(2).optional.greedy	//	匹配出现0次,1次或多次,并且尽可能多的重复匹配

注意:times() 默认使用宽松近邻模式
	可以使用 .consecutive()  //  指定使用严格近邻模式

判断条件
.where() 对事件中的字段进行判断,决定是否接受,多个where相当于 and
.or() 表示或相连
.until() 终止条件,清除状态,如果使用oneOrMore/oneOrMore.optional,建议使用

二、模式序列
1、严格近邻
.next 紧挨着

2、宽松近邻(只会匹配出最近的一条数据)
.followedBy 在后面即可

3、非确定性宽松近邻(会将符合条件的全部匹配出来)
.followedByAny

4、指定时间约束(与next 或者 followedBy 或者 followedByAny 组合使用)
.within(Time.seconds(10))

5、不想让某个事件严格近邻
.notNext

6、不想让某个事件发生在两个事件之间
.notFollowedBy

注意:
1、序列需要以.begin()开始
2、模式序列不能以.notFollowedBy()结束
3、not类型的模式不能被.optional修饰

demo 1

//  定义模式序列,循环模式
        Pattern<LoginEvent, LoginEvent> loginEventPattern = Pattern.<LoginEvent>begin("start")
                .where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
                        return "fail".equals(value.getEventType());
                    }
                })
                .times(2)   //  默认使用为宽松近邻
                .consecutive()  //  指定使用严格近邻模式
                .within(Time.seconds(5));

        //  将模式序列作用于流  (因为加了within,所以有成功匹配事件 和 超时事件)
        PatternStream<LoginEvent> pattern = CEP.pattern(loginEventLongKeyedStream, loginEventPattern);

        //  提取匹配上的事件
        pattern.select(new PatternSelectFunction<LoginEvent, String>() {
            //  在本代码中Map中有一个元素,start
            //      List中只有2个,因为用times 2
            @Override
            public String select(Map<String, List<LoginEvent>> map) throws Exception {

                //  获取数据
                LoginEvent start = map.get("start").get(0);   // 第一个匹配事件
                LoginEvent next = map.get("start").get(1);   // 第二个匹配事件
//                LoginEvent next = map.get("next").get(0);   //  第二个匹配事件

                return start.getUserId() + " 在 "+start.getEventTime()+" 到 "+next.getEventTime()+" 连续失败两次";
            }
        }).print();

demo 2

//  定义模式序列
        Pattern<OrderEvent, OrderEvent> orderEventPattern = Pattern.<OrderEvent>begin("start")
                .where(new SimpleCondition<OrderEvent>() {
                    @Override
                    public boolean filter(OrderEvent value) throws Exception {
                        return "create".equals(value.getEventType());
                    }
                })
                .followedBy("follow")
                .where(new SimpleCondition<OrderEvent>() {
                    @Override
                    public boolean filter(OrderEvent value) throws Exception {
                        return "pay".equals(value.getEventType());
                    }
                }).within(Time.minutes(15));

        //  将模式序列作用在流上
        PatternStream<OrderEvent> patternStream = CEP.pattern(orderEventLongKeyedStream, orderEventPattern);

        //  提取正常匹配上的和超时事件
        SingleOutputStreamOperator<String> result = patternStream.select(
                new OutputTag<String>("No Pay") {
                },
                new PatternTimeoutFunction<OrderEvent, String>() {
                    @Override
                    public String timeout(Map<String, List<OrderEvent>> map, long l) throws Exception {

                        //  提取事件
                        OrderEvent start = map.get("start").get(0);

                        //  输出侧输出流
                        return start.getOrderId() + " 订单未正常支付 " + start.getEventTime() + " 到 " + l / 1000 + " 之内";
                    }
                },
                new PatternSelectFunction<OrderEvent, String>() {
                    @Override
                    public String select(Map<String, List<OrderEvent>> map) throws Exception {
                        //  提取事件
                        OrderEvent start = map.get("start").get(0);
                        OrderEvent follow = map.get("follow").get(0);

                        //  输出主流
                        return start.getOrderId() + " 订单正常支付 " + start.getEventTime() + " 到 " + follow.getEventTime() + " 时间内完成";
                    }
                }
        );

        result.print();

        result.getSideOutput(new OutputTag<String>("No Pay") {}).print("Time Out");
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-31 16:42:40  更:2021-07-31 16:43:59 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/5 3:16:47-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码