KeyBy
- KeyBy算子是定义传输过程(Hash分区)而不处理数据本身;所有定义传输过程的算子是不能设置并行度的。
- keyBy对元组可以用下标
- DataStream → KeyedStream:输入时DataStream,输出时KeyedStream
- keyBy算子返回值是KeyedStream<T,Key>,泛型中,key在后面,元素在前面
- 逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,根据key的hash值 % 并行度 决定数据流向哪个并行度(分区)
注意: - keyedStream的聚合计算只是基于当前key的Stream,和分区无关;尽管分区个数有限,不少key会因为
hash值 % 并行度 相同分到一个分区进行计算处理,但是keyedStream的计算逻辑仅限于key。
基于KeyedStream的滚动聚合算子(Rolling Aggregation)
滚动聚合算子是做聚合用的,这些算子可以对KeyedStream的每一个支流(一个支流表示key相同的一个组)做聚合
- sum()
- min()
- minBy()
- maxBy()
- Reduce()
max
特点: max字段输出当前组内的最大值,非分组字段和组内第一个保持一致
maxBy
特点:maxBy字段为组内最大值,且非分组字段和组内最大值保持一致。
总结:maxBy和max的区别在于
非比较字段和谁保持一致,maxBy那么和组内最大值保持一致,max和组内第一条数据保持一致。
Reduce
- KeyedStream → DataStream
- 对一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
演示
- 需求:使用reduce聚合,取最小的温度值,并输出当前的时间戳
@Test
public void reduce(){
SingleOutputStreamOperator<SensorReading> map = source.map(new MyMapFunction());
KeyedStream<SensorReading, String> sensorReadingStringKeyedStream = map.keyBy(new KeySelector<SensorReading, String>() {
@Override
public String getKey(SensorReading sensorReading) throws Exception {
return sensorReading.getName();
}
});
SingleOutputStreamOperator<SensorReading> reduce = sensorReadingStringKeyedStream.reduce(new ReduceFunction<SensorReading>() {
@Override
public SensorReading reduce(SensorReading s1, SensorReading s2) throws Exception {
return new SensorReading(s2.getName(), s2.getTs(), Math.min(s1.getTemp(), s2.getTemp()));
}
});
reduce.print();
}
多个DataStream的操作算子
Split 和 Select
Split
- DataStream → SplitStream
- 根据某些特征把一个DataStream拆分成两个或者多个DataStream。
- 需要注意的是,SplitStream是一个Stream,其内部包含了两个DataStream
Select
- SplitStream→DataStream
- 从一个SplitStream中获取一个或者多个DataStream。
- Split将流分为两个流,但是还没有拆分成实际的两个流,就需要select将分流单独拎出来
@Test
public void Split() {
SingleOutputStreamOperator<SensorReading> map = source.map(new MyMapFunction());
SplitStream<SensorReading> splitStream = map.split(new OutputSelector<SensorReading>() {
@Override
public Iterable<String> select(SensorReading sensorReading) {
return (sensorReading.getTemp() > 30) ?
Collections.singletonList("high") : Collections.singletonList("low");
}
});
DataStream<SensorReading> highTempStream = splitStream.select("high");
DataStream<SensorReading> lowTempStream = splitStream.select("low");
DataStream<SensorReading> allTempStream = splitStream.select("high", "low");
highTempStream.print("high");
lowTempStream.print("low");
allTempStream.print("all");
}
说明:split和select进行分流、选流的操作已经过时了;现在建议用测输出流
Connect和 CoMap
Connect
- DataStream → ConnectedStreams
- 连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
CoMap,CoFlatMap
- ConnectedStreams → DataStream
- 作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。
额外说明: 1.connect可以将两个不同类型的流合并的一起,join也可以。 2.CoMap和FlatMap作用在ConnectedStreams中的每一个Stream
CoMap的意义就是统一两个流的数据类型,各自返回任意类型也可
Union
- DataStream → DataStream
- 对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。
DataStream unionStream = highTempStream.union(lowTempStream);
Connect与 Union 区别: 1. Union的两个流的类型必须是一样,Union后的流还是DataStream;也就是说Union后,两个流真的合并成了一个流 2. Connect可以不一样,Connect只是形成ConnectedStream,在其内部仍有两个各自独立的流;在之后的coMap中可以调整成为一样的,也可以各自输出各自的 3. Connect只能操作两个流,Union可以操作多个。
join
- Join和Connect一样,支持两个数据类型不同的流进行合并
- Join的两个流必须是k-v类型的
- Flink的Join操作分为两大类:window join和interval join
Flink中流的转换
|