IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flink cep数据源keyby union后 keybe失效 -> 正文阅读

[大数据]flink cep数据源keyby union后 keybe失效

问题背景:测试同学反馈,cep模板 对数据源设置分组条件后,告警的数据,和分组条件对不上, 掺杂了,其他的不同组的数据,产生了告警

策略条件:

选择了两个kafka的的topic的数据作为数据源,

对A 数据源 test-topic1, 进行条件过滤, 过滤条件为:login_type? = 1

对B 数据源 test-topic2,进行条件过滤,过滤条件为:login_type =? 2

分组条件 为? ?src_ip,hostname两个字段进行分组

进行followby 关联。时间关联的最大时间间隔为? 60秒

运行并行度设置为3

通过SourceStream打印的原始数据:

2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859021060,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859020192,"create_time_desc":"2022-10-27 16:23:40","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}
1> {"src_ip":"172.11.11.1","hostname":"hostname2","as":"B","create_time":1666859021231,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type?":"2"}

经过cep处理后,产了告警

产生告警:{A=[{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859021060,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}, {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666859020192,"create_time_desc":"2022-10-27 16:23:40","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"1"}], B=[{"src_ip":"172.11.11.1","hostname":"hostname2","as":"B","create_time":1666859021231,"create_time_desc":"2022-10-27 16:23:41","event_type_value":"single","id":"67d32010-1f66-4850-b110-a7087e419c64_0","login_type":"2"}]}

经过src_ip,和hostname分组后, 理论上应该只分组后的相同的 scr_ip,hostname进行事件关联告警

结果其他的分组数据也参和进来关联告警了。?

期望的是? login_type = 1 出现至少两次, 接着login_type=2的至少出现1次,且相同的src_ip和hostname

然后结果是下面数据也产生了告警。

{"src_ip":"172.11.11.1","hostname":"hostname1","login_type":1}
{"src_ip":"172.11.11.1","hostname":"hostname1","login_type":1}
{"src_ip":"172.11.11.1","hostname":"hostname1","login_type":2}

怀疑是分组没生效。

然后debug数据源那块的方法kafkaStreamSource() 里面有进行分组,debug后发现确实也进行了keyby

后来找不到其他问题,纠结了下, 怀疑是不是 KeyedSteam.union(KeyedStream)后得到的就不是一个KeyedSteam了。 所以

出现问题的原始代码数据源代码:

//程序具体执行流程
 DataStream<JSONObject> sourceStream = SourceProcess.getKafkaStream(env, rule);
            DataStream<JSONObject> resultStream = TransformProcess.process(sourceStream, rule);
            SinkProcess.sink(resultStream, rule);



public static DataStream<JSONObject> getKafkaStream(StreamExecutionEnvironment env, Rule rule) {
        DataStream<JSONObject> inputStream = null;
        List<Event> events = rule.getEvents();
        if (events.size() > SharingConstant.NUMBER_ZERO) {
            for (Event event : events) {
                FlinkKafkaConsumer<JSONObject> kafkaConsumer =
                        new KafkaSourceFunction(rule, event).init();
                if (inputStream != null) {
                    // 多条 stream 合成一条 stream
                    inputStream = inputStream.union(kafkaStreamSource(env, event, rule, kafkaConsumer));
                } else {
                    // 只有一条 stream
                    inputStream = kafkaStreamSource(env, event, rule, kafkaConsumer);
                }
            }
        }
        return inputStream;
    }



private static DataStream<JSONObject> kafkaStreamSource(
            StreamExecutionEnvironment env,
            Event event,
            Rule rule,
            FlinkKafkaConsumer<JSONObject> kafkaConsumer) {
        DataStream<JSONObject> inputStream = env.addSource(kafkaConsumer);

        // 对多个黑白名单查询进行循环
        String conditions = event.getConditions();
        while (conditions.contains(SharingConstant.ARGS_NAME)) {
            // 使用新的redis 数据结构,进行 s.include 过滤
            inputStream = AsyncDataStream.orderedWait(inputStream,new RedisNameListFilterSourceFunction(s,rule.getSettings().getRedis()),30,TimeUnit.SECONDS,2000);

            conditions = conditions.replace(s, "");
        }
        // 一般过滤处理
        inputStream = AsyncDataStream.orderedWait(inputStream,
                new Redis3SourceFunction(event, rule.getSettings().getRedis()), 30, TimeUnit.SECONDS, 2000);

        // kafka source 进行 keyBy 处理
        return KeyedByStream.keyedBy(inputStream, rule.getGroupBy());
    }

public static DataStream<JSONObject> keyedBy(
            DataStream<JSONObject> input, Map<String, String> groupBy) {
        if (null == groupBy || groupBy.isEmpty() ||"".equals(groupBy.values().toArray()[SharingConstant.NUMBER_ZERO])){
            return input;
        }
        return input.keyBy(
                new TwoEventKeySelector(
                        groupBy.values().toArray()[SharingConstant.NUMBER_ZERO].toString()));
    }

public class TwoEventKeySelector implements KeySelector<JSONObject, String> {
    private static final long serialVersionUID = 8534968406068735616L;
    private final String groupBy;

    public TwoEventKeySelector(String groupBy) {
        this.groupBy = groupBy;
    }

    @Override
    public String getKey(JSONObject event) {
        StringBuilder keys = new StringBuilder();
        for (String key : groupBy.split(SharingConstant.DELIMITER_COMMA)) {
            keys.append(event.getString(key));
        }
        return keys.toString();
    }
}

问题出现在这里:

// 多条 stream 合成一条 stream
? ? ? ? ? ? ? ? ? ? inputStream = inputStream.union(kafkaStreamSource(env, event, rule, kafkaConsumer));

kafkaStreamSource()这个方法返回的是 KeyedStream ,

两个KeyedStream unio合并后,? 本来以为返回时KeyedStream,结果确是DataStream类型,

结果导致cep分组不生效,一个告警中出现了其他分组的数据。

解决方法, 就是在cep pattern前 根据是否有分组条件再KeyedBy一次

  private static DataStream<JSONObject> patternProcess(DataStream<JSONObject> inputStream, Rule rule) {
        PatternGen patternGenerator = new PatternGen(rule.getPatterns(), rule.getWindow().getSize());
        Pattern<JSONObject, JSONObject> pattern = patternGenerator.getPattern();

        if (!rule.getGroupBy().isEmpty()){
            inputStream = KeyedByStream.keyedBy(inputStream, rule.getGroupBy());
        }

        PatternStream<JSONObject> patternStream = CEP.pattern(inputStream, pattern);
        return patternStream.inProcessingTime().select(new RuleSelectFunction(rule.getAlarmInfo(), rule.getSelects()));

输入数据:

?{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860300012,"create_time_desc":"2022-10-27 16:45:00","event_type_value":"single","id":"1288a709-d2b3-41c9-b7b7-e45149084514_0","login_type":"1"}
?{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860299272,"create_time_desc":"2022-10-27 16:44:59","event_type_value":"single","id":"1288a709-d2b3-41c9-b7b7-e45149084514_0","login_type":"1"}
?{"src_ip":"172.11.11.1","hostname":"hostname2","as":"B","create_time":1666860300196,"create_time_desc":"2022-10-27 16:45:00","event_type_value":"single","id":"1288a709-d2b3-41c9-b7b7-e45149084514_0","login_type":"2"}

不产生告警,符合预期

再次输入同分组的数据:

2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860369307,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860368471,"create_time_desc":"2022-10-27 16:46:08","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}
2> {"src_ip":"172.11.11.1","hostname":"hostname1","as":"B","create_time":1666860369478,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"2"}
产生告警:{A=[{"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860368471,"create_time_desc":"2022-10-27 16:46:08","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}, {"src_ip":"172.11.11.1","hostname":"hostname1","as":"A","create_time":1666860369307,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"1"}], B=[{"src_ip":"172.11.11.1","hostname":"hostname1","as":"B","create_time":1666860369478,"create_time_desc":"2022-10-27 16:46:09","event_type_value":"single","id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","app_id":"2"}]}
告警输出:{"org_log_id":"61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0,61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0,61004dd6-69ec-4d67-845c-8c15e7cc4bf7_0","event_category_id":1,"event_technique_type":"无","event_description":"1","alarm_first_time":1666860368471,"src_ip":"172.11.11.1","hostname":"hostname1","intelligence_id":"","strategy_category_id":"422596451785379862","intelligence_type":"","id":"cc1cd8cd-a626-4916-bdd3-539ea57e898f","event_nums":3,"event_category_label":"资源开发","severity":"info","create_time":1666860369647,"strategy_category_name":"网络威胁分析","rule_name":"ceptest","risk_score":1,"data_center":"guo-sen","baseline":[],"sop_id":"","event_device_type":"无","rule_id":214,"policy_type":"pattern","strategy_category":"/NetThreatAnalysis","internal_event":"1","event_name":"ceptest","event_model_source":"/RuleEngine/OnLine","alarm_last_time":1666860369478}

产生告警符合预期

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-10-31 12:04:09  更:2022-10-31 12:08:44 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/15 17:49:14-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码