1:map,flatMap,Filter
package com.atguigu.transfrom;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class Test1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
String path ="D:\\大数据组件API\\Flink\\Flink01\\src\\main\\resources\\hello.txt";
DataStreamSource<String> dataStream = env.readTextFile(path);
//map,返回每行字符的长度
SingleOutputStreamOperator<Integer> map = dataStream.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) throws Exception {
return value.length();
}
});
//flatMap,获取是以h开头的单词
SingleOutputStreamOperator<String> flatMap = dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] split = value.split(" ");
for (String s : split) {
if (s.startsWith("h")) {
out.collect(s);
}
}
}
});
//filter,过滤掉不是h开头的单词
SingleOutputStreamOperator<String> filter = dataStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
if (value.startsWith("h")) {
return true;
} else {
return false;
}
}
});
flatMap.print();
env.execute();
}
}
2:KeyBy
DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的。
3:滚动聚合
这些算子可以针对
KeyedStream
的每一个支流做聚合。
?
sum()
?
min()
?
max()
?
minBy()
?
maxBy()
package com.atguigu.transfrom;
import com.atguigu.bean.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Test2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
String path ="D:\\大数据组件API\\Flink\\Flink01\\src\\main\\resources\\test.txt";
DataStreamSource<String> dataStream = env.readTextFile(path);
DataStream<SensorReading> map = dataStream.map( value -> {
String[] split = value.split(",");
return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
});
//分组
KeyedStream<SensorReading, Tuple> keyedStream = map.keyBy("id");
// KeyedStream<SensorReading, String> keyedStream1 = map.keyBy(SensorReading::getId);
//滚动聚合
//max仅更新temperature数据,其他数据和以前一样
SingleOutputStreamOperator<SensorReading> temperature = keyedStream.max("temperature");
//maxBy获取最大temperature数据,其他字段是最大temperature对应的数据
SingleOutputStreamOperator<SensorReading> maxBy = keyedStream.maxBy("temperature");
maxBy.print();
env.execute();
}
}
4:Reduce
KeyedStream
→
DataStream
:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果
package com.atguigu.transfrom;
import com.atguigu.bean.SensorReading;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Test3 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
String path ="D:\\大数据组件API\\Flink\\Flink01\\src\\main\\resources\\test.txt";
DataStreamSource<String> dataStream = env.readTextFile(path);
DataStream<SensorReading> map = dataStream.map( value -> {
String[] split = value.split(",");
return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
});
//分组
KeyedStream<SensorReading, Tuple> keyedStream = map.keyBy("id");
SingleOutputStreamOperator<SensorReading> reduce = keyedStream.reduce(new ReduceFunction<SensorReading>() {
@Override
public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
return new SensorReading(value1.getId(),value2.getTimestamp(),
Math.max(value1.getTemperature(),value2.getTemperature()));
}
});
reduce.print();
env.execute();
}
}
|