Flink个人学习整理-Join和Hive Catalog篇(十五)
Flink Join
FlinkSQL中的状态永久保存 默认:PT0S 永久保存
public class FlinkSQL06_Join {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
tableEnvironment.getConfig().getIdleStateRetention();
tableEnvironment.getConfig().setIdleStateRetention(Duration.ofSeconds(10));
SingleOutputStreamOperator<TableA> aDS = env.socketTextStream("localhost", 9999)
.map(line -> {
String[] strings = line.split(",");
return new TableA(strings[0], strings[1]);
});
SingleOutputStreamOperator<TableB> bDS = env.socketTextStream("localhost", 9998)
.map(line -> {
String[] strings = line.split(",");
return new TableB(strings[0],Integer.parseInt(strings[1]));
});
tableEnvironment.createTemporaryView("tableA",aDS);
tableEnvironment.createTemporaryView("tableB",bDS);
tableEnvironment.sqlQuery(" select * from tableA a join tableB b on a.id = b.id")
.execute()
.print();
env.execute();
}
}
Hive Catalog
public class FlinkSQL01_Catalog {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
HiveCatalog hiveCatalog = new HiveCatalog("myclusterHive", "default", "input");
tableEnvironment.registerCatalog("myclusterHive",hiveCatalog);
tableEnvironment.useCatalog("myclusterHive");
tableEnvironment.executeSql("show tables")
.print();
}
}
|