Flink整合Drools
将drl文件转换为字符串,存入到mysql中(将drools规则字符串,构造成KieSession对象),通过canal监听此表,存入kafka中,然后通过flink,当作广播流,主流中数据得到动态规则,进行匹配。
(1)drl文件
package rules;
import cn.yyds.rulemk.demos.flink_drools.DataBean
import org.apache.commons.lang.StringUtils
rule "demo2"
when
$bean:DataBean()
then
String data = $bean.getData();
if(data.startsWith("a")){
$bean.setResult(data+" how are you");
}else{
$bean.setResult(data+" how old are you");
}
end
(2)将drl文件转换为字符串,存入到mysql中
package cn.yyds.rulemk.demos.flink_drools;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class RuleInjector {
public static void main(String[] args) throws Exception {
String drlString = FileUtils.readFileToString(new File("rule_engine/src/main/resources/rules/demo2.drl"), "utf-8");
Class.forName("com.mysql.jdbc.Driver");
Connection conn = DriverManager.getConnection("jdbc:mysql://hdp01:3306/abc?useUnicode=true&characterEncoding=utf8", "root", "123456");
PreparedStatement st = conn.prepareStatement("insert into rule_demo (rule_name,drl_String,online) values (?,?,?)");
st.setString(1,"demo2");
st.setString(2,drlString);
st.setString(3,"1");
st.execute();
st.close();
conn.close();
}
}
(3)Flink整合Drools
package cn.yyds.rulemk.demos.flink_drools;
import cn.doitedu.rulemk.marketing.utils.ConfigNames;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.kie.api.io.ResourceType;
import org.kie.api.runtime.KieSession;
import org.kie.internal.utils.KieHelper;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@Slf4j
public class FlinkDrools {
public static void main(String[] args) throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
DataStream<String> dataStream = env.socketTextStream("localhost", 5656);
DataStream<DataBean> dataBeanStream = dataStream.map(s -> new DataBean(s, null));
Properties props = new Properties();
props.setProperty("bootstrap.servers", "hdp01:9092,hdp02:9092,hdp03:9092");
props.setProperty("auto.offset.reset", "latest");
DataStreamSource<String> drlStream = env.addSource(new FlinkKafkaConsumer<String>("rule-demo", new SimpleStringSchema(), props));
MapStateDescriptor<String, KieSession> mapStateDescriptor = new MapStateDescriptor<>("rule_state", String.class, KieSession.class);
BroadcastStream<String> broadcast = drlStream.broadcast(mapStateDescriptor);
BroadcastConnectedStream<DataBean, String> connect = dataBeanStream.connect(broadcast);
connect.process(new BroadcastProcessFunction<DataBean, String, String>() {
@Override
public void processElement(DataBean dataBean, ReadOnlyContext ctx, Collector<String> out) throws Exception {
ReadOnlyBroadcastState<String, KieSession> state = ctx.getBroadcastState(mapStateDescriptor);
Iterable<Map.Entry<String, KieSession>> entries = state.immutableEntries();
for (Map.Entry<String, KieSession> entry : entries) {
KieSession kieSession = entry.getValue();
kieSession.insert(dataBean);
kieSession.fireAllRules();
out.collect(dataBean.getResult());
}
}
@Override
public void processBroadcastElement(String canalBinlog, Context ctx, Collector<String> out) throws Exception {
CanalBean canalBean = JSON.parseObject(canalBinlog, CanalBean.class);
BroadcastState<String, KieSession> state = ctx.getBroadcastState(mapStateDescriptor);
List<DbRecord> dbRecordList = canalBean.getData();
for (DbRecord dbRecord : dbRecordList) {
String rule_name = dbRecord.getRule_name();
String drl_string = dbRecord.getDrl_string();
KieHelper kieHelper = new KieHelper();
kieHelper.addContent(drl_string, ResourceType.DRL);
KieSession kieSession = kieHelper.build().newKieSession();
String operationType = canalBean.getType();
String online = dbRecord.getOnline();
if ("INSERT".equals(operationType) || ("UPDATE".equals(operationType) && "1".equals(online))) {
log.info("注入一条规则,rule_name: {}",rule_name);
state.put(rule_name, kieSession);
} else if ("DELETE".equals(operationType) || ("UPDATE".equals(operationType) && "0".equals(online))) {
log.info("删除一条规则,rule_name: {}",rule_name);
state.remove(rule_name);
}
}
}
}).print();
env.execute();
}
}
注:
@Data
@NoArgsConstructor
@AllArgsConstructor
public class DbRecord {
private String id;
private String rule_name;
private String drl_string;
private String online;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DataBean {
private String data;
private String result;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CanalBean {
private List<DbRecord> data;
private String type;
}
|