案例说明
利用flink source 功能实现一个自定义的实时数据源。 达到的效果是:将实时的商品数据进行分流,分成even 和 odd 两个流进行join。条件是名称相同,最后把join 的结果输出。
flink 1.10 版本 的outputSelector 实现
public class MyStreamingSource implements SourceFunction<Item> {
private boolean isRunning = true;
public void run(SourceContext<Item> ctx) throws Exception {
while(isRunning){
Item item = generateItem();
ctx.collect(item);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
}
private Item generateItem(){
int i = new Random().nextInt(100);
ArrayList<String> list = new ArrayList();
list.add("HAT");
list.add("TIE");
list.add("SHOE");
Item item = new Item();
item.setName(list.get(new Random().nextInt(3)));
item.setId(i);
return item;
}
}
class StreamingDemo {
public static void main(String[] args) throws Exception {
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
SingleOutputStreamOperator<Item> source = bsEnv.addSource(new MyStreamingSource()).map(new MapFunction<Item, Item>() {
@Override
public Item map(Item item) throws Exception {
return item;
}
});
DataStream<Item> evenSelect = source.split(new OutputSelector<Item>() {
@Override
public Iterable<String> select(Item value) {
List<String> output = new ArrayList<>();
if (value.getId() % 2 == 0) {
output.add("even");
} else {
output.add("odd");
}
return output;
}
}).select("even");
DataStream<Item> oddSelect = source.split(new OutputSelector<Item>() {
@Override
public Iterable<String> select(Item value) {
List<String> output = new ArrayList<>();
if (value.getId() % 2 == 0) {
output.add("even");
} else {
output.add("odd");
}
return output;
}
}).select("odd");
bsTableEnv.createTemporaryView("evenTable", evenSelect, "name,id");
bsTableEnv.createTemporaryView("oddTable", oddSelect, "name,id");
Table queryTable = bsTableEnv.sqlQuery("select a.id,a.name,b.id,b.name from evenTable as a join oddTable as b on a.name = b.name");
queryTable.printSchema();
bsTableEnv.toRetractStream(queryTable, TypeInformation.of(new TypeHint<Tuple4<Integer,String,Integer,String>>(){})).print();
bsEnv.execute("streaming sql job");
}
}
flink 1.12 版本 OutputTag 实现
public class MyStreamingSource implements SourceFunction<Item> {
private boolean isRunning = true;
public void run(SourceContext<Item> ctx) throws Exception {
while(isRunning){
Item item = generateItem();
ctx.collect(item);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
}
private Item generateItem(){
int i = new Random().nextInt(100);
ArrayList<String> list = new ArrayList();
list.add("HAT");
list.add("TIE");
list.add("SHOE");
Item item = new Item();
item.setName(list.get(new Random().nextInt(3)));
item.setId(i);
return item;
}
}
class StreamingDemo {
public static void main(String[] args) throws Exception {
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
OutputTag<Item> outputTag1 = new OutputTag<Item>("even") {};
OutputTag<Item> outputTag2 = new OutputTag<Item>("odd") {};
SingleOutputStreamOperator<Item> source = bsEnv.addSource(new MyStreamingSource()).map(new MapFunction<Item, Item>() {
@Override
public Item map(Item item) throws Exception {
return item;
}
});
SingleOutputStreamOperator<Item> result= source.process(new ProcessFunction<Item,Item>() {
@Override
public void processElement(Item value, Context ctx, Collector<Item> out) throws Exception {
if (value.getId() % 2 == 0) {
ctx.output(outputTag1, value);
} else {
ctx.output(outputTag2, value);
}
}
});
DataStream<Item> so1 = result.getSideOutput(outputTag1);
DataStream<Item> so2 = result.getSideOutput(outputTag2);
bsTableEnv.createTemporaryView("evenTable", so1);
bsTableEnv.createTemporaryView("oddTable", so2);
Table queryTable = bsTableEnv.sqlQuery("select a.id,a.name,b.id,b.name from evenTable as a join oddTable as b on a.name = b.name");
bsTableEnv.toRetractStream(queryTable, TypeInformation.of(new TypeHint<Tuple4<Integer,String,Integer,String>>(){})).print();
bsEnv.execute("streaming sql job");
}
}
|