①map、flatform map:输入一个元素,输出一个元素 flatMap:打平操作,将输入的元素压平,从而对输出结果的数量不做要求,可以为0、1或者多个 ②keyBy 在逻辑上将Stream根据key的Hash值进行分区,DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区 ③滚动聚合算子(Rolling Aggregation) 这些算子可以针对 KeyedStream 的每一个支流做聚合。 sum() min() max() minBy() maxBy() min与minBy的区别 假设数据如下: sensor_1,1547718199,35.8 sensor_1,1547718201,15.4 sensor_1,1547718302,6.7 sensor_1,1547718405,38.1 sensor_1,1547718500,3 sensor_1,1547718600,12.5 sensor_1,1547718700,13 min程序:
val resultStream = inputStream
.map(data => {
val arr = data.split(",")
SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
})
.keyBy("id")
.min("temperature")
结果: SensorReading(sensor_1,1547718199,35.8) SensorReading(sensor_1,1547718199,15.4) SensorReading(sensor_1,1547718199,6.7) SensorReading(sensor_1,1547718199,6.7) SensorReading(sensor_1,1547718199,3.0) SensorReading(sensor_1,1547718199,3.0) SensorReading(sensor_1,1547718199,3.0) min只找到对应字段的最小值,其他字段为首次出现值
minBy程序:
val resultStream = inputStream
.map(data => {
val arr = data.split(",")
SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
})
.keyBy("id")
.minBy("temperature")
结果: SensorReading(sensor_1,1547718199,35.8) SensorReading(sensor_1,1547718201,15.4) SensorReading(sensor_1,1547718302,6.7) SensorReading(sensor_1,1547718302,6.7) SensorReading(sensor_1,1547718500,3.0) SensorReading(sensor_1,1547718500,3.0) SensorReading(sensor_1,1547718500,3.0) minBy以数入字段为基础找到其最小值及其这个最小值对应的其他字段 ④reduce KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。 ⅰ.lambda表达式:
def reduce(fun: (T, T) => T): DataStream[T] = {
}
其中第一个T为先前聚合结果,后一个T为新数据 程序:
val resultStream = inputStream
.map(data =>{
val arr = data.split(",")
SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
})
.keyBy("id")
.reduce((curSensor,newSensor) =>{
SensorReading(curSensor.id,newSensor.timestamp,
curSensor.temperature.min(newSensor.temperature))
})
ⅱ.ReduceFunction
def reduce(reducer: ReduceFunction[T]): DataStream[T] = {
}
程序:
val resultStream = inputStream
.map(data =>{
val arr = data.split(",")
SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
})
.keyBy("id")
.reduce(new MyReduceFunction)
class MyReduceFunction extends ReduceFunction[SensorReading]{
override def reduce(t: SensorReading, t1: SensorReading): SensorReading = {
SensorReading(t.id,t1.timestamp,t.temperature.min(t1.temperature))
}
}
⑤split将一个流分成多个流DataStream → SplitStream select将分开的流选取出来SplitStream→DataStream 程序:
val splitStream = dataStream
.split(data =>{
if(data.temperature > 20) List("high") else List("low")
})
val highStream = splitStream.select("high")
val lowStream = splitStream.select("low")
val allStream = splitStream.select("high","low")
⑥connect、comap、union Connect DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。 程序:
val splitStream = dataStream
.split(data => {
if(data.temperature > 10) List("high") else List("low")
})
val highStream = splitStream.select("high")
val lowStream = splitStream.select("low")
val allStream = highStream.connect(lowStream.map(data => {
(data.id,data.temperature)
}))
comap connectedStreams → DataStream:作用于 ConnectedStreams 上,功能与 map和 flatMap 一样,对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap处理。
def map[R: TypeInformation](fun1: IN1 => R, fun2: IN2 => R):
}
其中fun1与fun2输出可不同
val resultStream = allStream
.map(
data => (data.id,data.temperature,"high")
,
data => data
)
union DataStream → DataStream:对两个或者两个以上的 DataStream 进行 union 操 作,产生一个包含所有 DataStream 元素的新 DataStream
val allStream = highStream.union(lowStream)
val resultStream = allStream.map(data =>{
(data.id,data.temperature)
})
connect 与 union 区别: ①union流的类型必须是一样,union 可以操作多个流DataStream → DataStream。 ②connect流的类型可以不一样,在之后的 coMap中再去调整成为connectedStreams → DataStream,connect 只能操作两个流。
|