IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink整合Drools -> 正文阅读

[大数据]Flink整合Drools

Flink整合Drools

将drl文件转换为字符串,存入到mysql中(将drools规则字符串,构造成KieSession对象),通过canal监听此表,存入kafka中,然后通过flink,当作广播流,主流中数据得到动态规则,进行匹配。

(1)drl文件

package rules;

import cn.yyds.rulemk.demos.flink_drools.DataBean
import org.apache.commons.lang.StringUtils

rule "demo2"

    when
      $bean:DataBean()

    then
       String data = $bean.getData();
       if(data.startsWith("a")){
          $bean.setResult(data+" how are you");
       }else{
          $bean.setResult(data+" how old are you");
       }
end

(2)将drl文件转换为字符串,存入到mysql中

package cn.yyds.rulemk.demos.flink_drools;

import org.apache.commons.io.FileUtils;

import java.io.File;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class RuleInjector {

    public static void main(String[] args) throws Exception {


        String drlString = FileUtils.readFileToString(new File("rule_engine/src/main/resources/rules/demo2.drl"), "utf-8");

        Class.forName("com.mysql.jdbc.Driver");
        Connection conn = DriverManager.getConnection("jdbc:mysql://hdp01:3306/abc?useUnicode=true&characterEncoding=utf8", "root", "123456");
        PreparedStatement st = conn.prepareStatement("insert into rule_demo (rule_name,drl_String,online) values (?,?,?)");
        st.setString(1,"demo2");
        st.setString(2,drlString);
        st.setString(3,"1");

        st.execute();
        st.close();
        conn.close();


    }

}

(3)Flink整合Drools

package cn.yyds.rulemk.demos.flink_drools;

import cn.doitedu.rulemk.marketing.utils.ConfigNames;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.kie.api.io.ResourceType;
import org.kie.api.runtime.KieSession;
import org.kie.internal.utils.KieHelper;

import java.util.List;
import java.util.Map;
import java.util.Properties;

@Slf4j
public class FlinkDrools {
    public static void main(String[] args) throws Exception {

        // 读取业务数据
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);
        DataStream<String> dataStream = env.socketTextStream("localhost", 5656);
        DataStream<DataBean> dataBeanStream = dataStream.map(s -> new DataBean(s, null));

        // 读取规则
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "hdp01:9092,hdp02:9092,hdp03:9092");
        props.setProperty("auto.offset.reset", "latest");
        DataStreamSource<String> drlStream = env.addSource(new FlinkKafkaConsumer<String>("rule-demo", new SimpleStringSchema(), props));

        MapStateDescriptor<String, KieSession> mapStateDescriptor = new MapStateDescriptor<>("rule_state", String.class, KieSession.class);
        BroadcastStream<String> broadcast = drlStream.broadcast(mapStateDescriptor);

        BroadcastConnectedStream<DataBean, String> connect = dataBeanStream.connect(broadcast);


        connect.process(new BroadcastProcessFunction<DataBean, String, String>() {
            @Override
            public void processElement(DataBean dataBean, ReadOnlyContext ctx, Collector<String> out) throws Exception {

                ReadOnlyBroadcastState<String, KieSession> state = ctx.getBroadcastState(mapStateDescriptor);
                Iterable<Map.Entry<String, KieSession>> entries = state.immutableEntries();
                for (Map.Entry<String, KieSession> entry : entries) {
                    KieSession kieSession = entry.getValue();

                    // 调用drools引擎,对进来的业务数据data进行处理
                    kieSession.insert(dataBean);
                    kieSession.fireAllRules();

                    // 输出处理结果
                    out.collect(dataBean.getResult());
                }
            }

            @Override
            public void processBroadcastElement(String canalBinlog, Context ctx, Collector<String> out) throws Exception {
                CanalBean canalBean = JSON.parseObject(canalBinlog, CanalBean.class);

                BroadcastState<String, KieSession> state = ctx.getBroadcastState(mapStateDescriptor);

                List<DbRecord> dbRecordList = canalBean.getData();
                for (DbRecord dbRecord : dbRecordList) {
                    // drools规则名称
                    String rule_name = dbRecord.getRule_name();

                    // drools规则内容
                    String drl_string = dbRecord.getDrl_string();

                    // 将drools规则字符串,构造成KieSession对象
                    KieHelper kieHelper = new KieHelper();
                    kieHelper.addContent(drl_string, ResourceType.DRL);
                    KieSession kieSession = kieHelper.build().newKieSession();

                    // 将构造好的KieSession对象放入广播state
                    String operationType = canalBean.getType();
                    String online = dbRecord.getOnline();
                    if ("INSERT".equals(operationType) || ("UPDATE".equals(operationType) && "1".equals(online))) {
                        log.info("注入一条规则,rule_name: {}",rule_name);
                        state.put(rule_name, kieSession);
                    } else if ("DELETE".equals(operationType) || ("UPDATE".equals(operationType) && "0".equals(online))) {
                        log.info("删除一条规则,rule_name: {}",rule_name);
                        state.remove(rule_name);
                    }
                }

            }
        }).print();

        env.execute();

    }

}

注:

@Data
@NoArgsConstructor
@AllArgsConstructor
public class DbRecord {
    private String id;
    private String rule_name;
    private String drl_string;
    private String online;

}



@Data
@AllArgsConstructor
@NoArgsConstructor
public class DataBean {
    private String data;

    private String result;

}


@Data
@NoArgsConstructor
@AllArgsConstructor
public class CanalBean {

    private List<DbRecord> data;
    private String type;

}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-09 18:28:13  更:2022-04-09 18:31:42 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 13:23:09-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码