Flink的旁路输出特性可以用来对数据进行分流,通过是通过创建一个流的标签(OutputTag),再利用这个OutputTag标签对象作为参数,调用初始/父级数据流的getSideOutput(OutputTag)方法获取子数据流。
由于每个流标签都有一个id,因此不需要创建对象,只要流标签的id相同,其中的数据就相同。因此可以通过匿名内部类的形式来获取子数据流。
例:对初始字符串进行数据分类,将字母、数字、符号分发到不同的子数据流中进行处理。
public class Producer {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092");
DataStreamSource<String> source = env.fromElements("n1!u2m%3$b@*4e5r");
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("test", new SimpleStringSchema(), properties);
source.addSink(producer);
env.execute();
}
}
public class Consumer {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
DataStreamSource<String> source = env.addSource(consumer);
SingleOutputStreamOperator<String> process = source.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String s, ProcessFunction<String, String>.Context context, Collector<String> collector) {
StringBuilder num = new StringBuilder();
StringBuilder word = new StringBuilder();
StringBuilder other = new StringBuilder();
for (char c : s.toCharArray()) {
if (c >= '0' && c <= '9') {
num.append(c);
} else if (c >= 'a' && c <= 'z') {
word.append(c);
} else {
other.append(c);
}
}
context.output(new OutputTag<>("num", Types.STRING), num.toString());
context.output(new OutputTag<>("word", Types.STRING), word.toString());
context.output(new OutputTag<>("other", Types.STRING), other.toString());
}
});
process.getSideOutput(new OutputTag<>("num", Types.STRING)).print("这是数字:");
process.getSideOutput(new OutputTag<>("word", Types.STRING)).print("这是字母:");
process.getSideOutput(new OutputTag<>("other", Types.STRING)).print("这是字符:");
env.execute();
}
}
先启动Consumer,再启动Producer即可。控制台输出:
这是数字::7> 12345
这是字母::7> number
这是字符::7> !%$@*
例:对Kafka的初始股票字符串进行解析处理,再发送、读取到其他主题。
public class Producer {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092");
DataStreamSource<String> source = env.fromElements("name:五粮液,code:000858,price:172");
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("test", new SimpleStringSchema(), properties);
source.addSink(producer);
env.execute();
}
}
public class Consumer {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
DataStreamSource<String> source = env.addSource(consumer);
SingleOutputStreamOperator<String> process = source.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String s, ProcessFunction<String, String>.Context context, Collector<String> collector) {
Stock stock = new Stock();
for (String info : s.split(",")) {
String[] split = info.split(":");
switch (split[0]) {
case "name":
stock.setName(split[1]);
break;
case "code":
stock.setCode(split[1]);
break;
case "price":
stock.setPrice(Integer.parseInt(split[1]));
break;
}
}
collector.collect(stock.toString());
}
});
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("stock", new SimpleStringSchema(), properties);
process.addSink(producer);
FlinkKafkaConsumer<String> consumer_1 = new FlinkKafkaConsumer<>("stock", new SimpleStringSchema(), properties);
env.addSource(consumer_1).print("输出股票信息:");
env.execute();
}
static class Stock {
private String name;
private String code;
private Integer price;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getCode() {
return code;
}
@Override
public String toString() {
return "Stock{" +
"name='" + name + '\'' +
", code='" + code + '\'' +
", price=" + price +
'}';
}
public void setCode(String code) {
this.code = code;
}
public Integer getPrice() {
return price;
}
public void setPrice(Integer price) {
this.price = price;
}
}
}
先启动Consumer,再启动Producer即可。控制台输出:
输出股票信息::3> Stock{name=‘五粮液’, code=‘000858’, price=172}
|