public class IterateStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataStreamSource<String> source = env.addSource(new FlinkKafkaConsumer010<String>("dwd", new SimpleStringSchema(), KafkaUtils.comsumerProps()));
SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> vod = source.map(new MapFunction<String, Tuple3<String, Integer, Integer>>() {
@Override
public Tuple3<String, Integer, Integer> map(String s) throws Exception {
JSONObject jn = JSON.parseObject(s);
return new Tuple3<>(jn.getString("userid"), Integer.parseInt(jn.getString("seconds")), Integer.parseInt(jn.getString("seconds")));
}
});
// iterate方法将一个流转换为迭代流
IterativeStream<Tuple3<String, Integer, Integer>> iterate = vod.iterate();
// 迭代条件,满足条件则迭代
SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> feedback = iterate.filter(new FilterFunction<Tuple3<String, Integer, Integer>>() {
@Override
public boolean filter(Tuple3<String, Integer, Integer> t) throws Exception {
return t.f1 < 80;
}
});
// 迭代计算逻辑
SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> iterateBody = feedback.map(new MapFunction<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>>() {
@Override
public Tuple3<String, Integer, Integer> map(Tuple3<String, Integer, Integer> t) throws Exception {
return new Tuple3<>(t.f0, t.f1 + 10, t.f2);
}
});
//
iterate.closeWith(iterateBody);
// 满足条件则跳出迭代,输出数据
SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> result = iterate.filter(new FilterFunction<Tuple3<String, Integer, Integer>>() {
@Override
public boolean filter(Tuple3<String, Integer, Integer> t) throws Exception {
return t.f1 >= 80;
}
});
result.print();
env.execute();
}
}
|