Storm迁移flink主要问题:
- Storm通过自定义的Bolt类实现自己的业务逻辑,如何在flink中实现
通过flink的ProcessFuction类实现,可以通过继承该类,在processElement方法中实现自己的业务逻辑。
- Storm按照业务类型分发数据处理的逻辑,如何在flink中实现
通过flink的旁路输出特性实现,对原始的数据流按照某些分类标准分类,输出到不同的子数据流中处理。
总体处理流程:
-
Flink从Kafka中读取数据,作为初始数据流initDataStream; -
对初始数据流进行分类处理,将子数据输出到数据流dataStream1、dataStream2、…、dataStreamn中; -
按照自己的业务逻辑,对子数据流dataStream1_1、dataStream2_1、…、dataStreamn_1处理,得到dataStream1_2、dataStream2_2、…、dataStreamn_2,经过多次处理,最终得到dataStream1_x、dataStream2_y、…、dataStreamn_z -
最后利用SinkFuction进行落地处理
例:利用flink对北京、上海两城市民信息进行过滤,过滤掉工资小于1000和大于20000的数据,然后按姓名_城市_收入的格式输出
输入:个人信息{姓名、年龄、性别、城市、收入}
输出:姓名_城市_收入,如:a_b_100
Person类:
public class Person {
private String name;
private Integer age;
private Sex sex;
private Integer salary;
private Country country;
}
enum Sex {
Male,
Female
}
enum Country {
Beijing,
Shanghai
}
生产者程序:
public class FlinkProducer {
private static final Properties properties;
static {
properties = new Properties();
properties.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092");
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaProducer<String> flinkKafkaProducer = new FlinkKafkaProducer<>("flink", new SimpleStringSchema(), properties);
String[] initData = getData();
DataStreamSource<String> source = env.fromElements(initData);
source.addSink(flinkKafkaProducer);
env.execute();
}
private static String[] getData() {
String[] data = new String[100];
for (int i = 0; i < 100; i++) {
Person person = new Person();
person.setName("name");
person.setAge(18);
int sex = (int) (Math.random() * 100);
if (sex % 2 == 0) {
person.setSex(Sex.Female);
} else {
person.setSex(Sex.Male);
}
int country = (int) (Math.random() * 100);
if (country % 2 == 0) {
person.setCountry(Country.Beijing);
} else {
person.setCountry(Country.Shanghai);
}
person.setSalary((int) (Math.random() * 50000));
data[i] = JsonUtils.serialize(person);
}
return data;
}
}
消费者程序:
public class FlinkConsumer {
private static final Properties properties;
private static final OutputTag<String> MALE = new OutputTag<>("male", Types.STRING);
private static final OutputTag<String> FEMALE = new OutputTag<>("female", Types.STRING);
static {
properties = new Properties();
properties.put("bootstrap.servers", "10.225.173.107:9092,10.225.173.108:9092,10.225.173.109:9092");
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>("flink", new SimpleStringSchema(), properties);
DataStreamSource<String> initDataStream = env.addSource(flinkKafkaConsumer);
SingleOutputStreamOperator<String> process = initDataStream.process(new SexSplitBolt());
DataStream<String> dataStream_male = process.getSideOutput(MALE);
DataStream<String> dataStream_female = process.getSideOutput(FEMALE);
SingleOutputStreamOperator<String> dataStream_male_1 = dataStream_male.process(new SalarySplitBolt());
SingleOutputStreamOperator<String> dataStream_female_1 = dataStream_female.process(new SalarySplitBolt());
SingleOutputStreamOperator<String> dataStream_male_1_2 = dataStream_male_1.process(new SalaryAddBolt());
SingleOutputStreamOperator<String> dataStream_female_1_2 = dataStream_female_1.process(new SalaryAddBolt());
dataStream_male_1_2.addSink(new OutputBolt());
dataStream_female_1_2.addSink(new OutputBolt());
env.execute();
}
static class SexSplitBolt extends ProcessFunction<String, String> {
@Override
public void processElement(String s, ProcessFunction<String, String>.Context context, Collector<String> collector) {
Person person = JsonUtils.deserialize(s, Person.class);
if (person.getSex() == Sex.Male) {
context.output(MALE, s);
} else {
context.output(FEMALE, s);
}
}
}
static class SalarySplitBolt extends ProcessFunction<String, String> {
@Override
public void processElement(String s, ProcessFunction<String, String>.Context context, Collector<String> collector) {
Person person = JsonUtils.deserialize(s, Person.class);
if (person.getSalary() < 1000 || person.getSalary() > 20000) {
return;
}
collector.collect(s);
}
}
static class SalaryAddBolt extends ProcessFunction<String, String> {
@Override
public void processElement(String s, ProcessFunction<String, String>.Context context, Collector<String> collector) {
Person person = JsonUtils.deserialize(s, Person.class);
person.setSalary((int) (person.getSalary() * 1.2));
collector.collect(s);
}
}
static class OutputBolt implements SinkFunction<String> {
@Override
public void invoke(String value, Context context) {
Person person = JsonUtils.deserialize(value, Person.class);
System.out.println("已处理数据:" + person.getName() + "_" + person.getCountry() + "_" + person.getSalary());
}
}
}
控制台输出: 已处理数据:name_Beijing_9320
已处理数据:name_Beijing_1049
已处理数据:name_Beijing_11548
已处理数据:name_Shanghai_16721
已处理数据:name_Beijing_3638
已处理数据:name_Shanghai_18862
已处理数据:name_Shanghai_8183
已处理数据:name_Shanghai_18627
已处理数据:name_Shanghai_16510
已处理数据:name_Beijing_18983
已处理数据:name_Beijing_10734
已处理数据:name_Shanghai_1594
已处理数据:name_Beijing_3640
已处理数据:name_Shanghai_17757
已处理数据:name_Shanghai_16700
已处理数据:name_Beijing_1175
已处理数据:name_Beijing_11788
已处理数据:name_Shanghai_15732
已处理数据:name_Beijing_15137
已处理数据:name_Beijing_12853
已处理数据:name_Beijing_11548
已处理数据:name_Shanghai_1188
已处理数据:name_Shanghai_3202
已处理数据:name_Shanghai_3986
已处理数据:name_Shanghai_8193
已处理数据:name_Beijing_16093
已处理数据:name_Beijing_8274
已处理数据:name_Beijing_3630
已处理数据:name_Beijing_11962
已处理数据:name_Beijing_14441
已处理数据:name_Shanghai_5312
已处理数据:name_Beijing_1815
已处理数据:name_Shanghai_11287
已处理数据:name_Beijing_5848
已处理数据:name_Beijing_11056
已处理数据:name_Shanghai_12407
已处理数据:name_Shanghai_12652
已处理数据:name_Beijing_19864
已处理数据:name_Shanghai_18935
已处理数据:name_Shanghai_16894
已处理数据:name_Shanghai_9732
36570 [TaskExecutorLocalStateStoresManager shutdown hook] INFO o.a.f.r.s.TaskExecutorLocalStateStoresManager - Shutting down TaskExecutorLocalStateStoresManager.
|