maven项目的文本文件与pom.xml 配置请参考:https://blog.csdn.net/weixin_35757704/article/details/120555968 同样以wordcount为例
package transform;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCountKeyBy {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.readTextFile("src/main/resources/hello.txt");
env.setParallelism(3);
DataStream<Tuple2<String, Integer>> sensorStream = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] wordString = value.split(" ");
for (String wordLine : wordString) {
out.collect(new Tuple2<>(wordLine, 1));
}
}
});
KeyedStream<Tuple2<String, Integer>, Object> key = sensorStream.keyBy(tuple -> tuple.f0);
SingleOutputStreamOperator<Tuple2<String, Integer>> resultStream = key.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<String, Integer>(value1.f0, value1.f0.length() + value1.f1);
}
});
resultStream.print();
env.execute();
}
}
在上面的第5步为自定义的聚合操作,其中:reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) 中value1 为旧有的状态,value2 为新输入的状态;
上面的代码中Tuple 第一个位置返回原有的单词,而Tuple 第二个位置每次都加一次当前单词的长度
|