1、代码案例
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Demo04 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> generateSequence = env.generateSequence(0, 100);
IterativeStream<Long> iterativeStream = generateSequence.iterate();
SingleOutputStreamOperator<Long> streamOperator = iterativeStream.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long input) throws Exception {
return input - 1;
}
});
SingleOutputStreamOperator<Long> closeFilter = streamOperator.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long input) throws Exception {
return input > 0;
}
});
iterativeStream.closeWith(closeFilter);
streamOperator.filter(e->e > 0).print();
env.execute("iterate");
}
}
2、测试数据 最终结果到1后,输出流不再输出数据 2> 1 4> 3 3> 2 5> 4 4> 2 3> 1 5> 3 4> 1 5> 2 5> 1
|