一.flink算子的执行过程
env -> source -> transform -> sink
3.transform算子
转换算子,把当前的DataStream转化为另一个DataStream
1)简单转换算子(DataStream)
不涉及到分组,不涉及拆分与合并,可以以one-to-one传递数据的流
前提:在虚拟机开启8888端口,在端口发送数据,在控制台进行接收原数据
nc -lk 8888
?注意:转换算子需要用到隐式转换
import org.apache.flink.streaming.api.scala._
/**
* 创建一个温度传感器的类,把三个字段以参数的形式进行传递
* @param id id
* @param timestamp 时间戳
* @param temperature 温度值
*/
case class SensorReading(id:String,timestamp:Long,temperature:Double)
/**
* 简单转换流
* @param env 运行环境
* @param dataStreamOriginal 原始数据流
*/
def transform_simple(env:StreamExecutionEnvironment,dataStreamOriginal:DataStream[String]) = {
//转换算子需要用到隐式转换
//import org.apache.flink.streaming.api.scala._
//把得到的数据进行切割,以类的参数的形式传出数据
val dataStream_sensorReading:DataStream[SensorReading] = dataStreamOriginal.map(line=>{
val fields = line.split(",")
SensorReading(fields(0),fields(1).toLong,fields(2).toDouble)
})
//把得到的数据源输出
dataStream_sensorReading.print()
//把结果进行返回,传给其他参数
dataStream_sensorReading
}
def main(args: Array[String]): Unit = {
//创建实时流处理的环境
val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置全局并行度为1
env.setParallelism(1)
//创建原始数据流
val dataStream_Original:DataStream[String]= env.socketTextStream("node160",8888)
transform_simple(env,dataStream_Original)
//实时数据流必须设置开启启动执行的语句
env.execute("transform")
}
b.键控算子(keyedStream)
按照key相同的进行分组?
滚动聚合算子,一定放在键控算子之后使用
聚合算子的特性
min:同一个key下,取当前字段的最小值,但是其他字段取同组内的第一条数据的值
minBy:聚合算子,minBy特点:同一个key下,取当前字段的最小值,其他字段取最小值的那条数据的内容
reduce:自定义取值方式
? ? ? ? ? ? ? ?两种形式:以匿名函数的形式和以ReduceFunction类形式
前提:在虚拟机开启8888端口,在端口发送数据,在控制台进行接收原数据
nc -lk 8888
/**
* 以继承类的形式
* 自定义reduce的类
*/
class MyReduce extends RichReduceFunction[SensorReading]{
override def reduce(t: SensorReading, t1: SensorReading): SensorReading = {
SensorReading(t.id,t1.timestamp,t.temperature.min(t1.temperature))
}
}
/**
* 键控转换算子
* @param env 运行环境
* @param dataStreamOriginal 原数据
*/
def transform_keyBy(env:StreamExecutionEnvironment,dataStreamOriginal: DataStream[String]): Unit ={
//把字符串类型的数据转成SensorReading类型的数据
//简单转换流已经进行转化,直接获取即可
val dataStream_sensorReading:DataStream[SensorReading] = transform_simple(env,dataStreamOriginal)
//把数据按照id为key进行分组 可以直接写字段名,也可以写索引值
val dataStream_keyById:KeyedStream[SensorReading,Tuple] = dataStream_sensorReading.keyBy(0)
//聚合算子,min特点:同一个key下,取当前字段的最小值,但是其他字段取同组内的第一条数据的值
val dataStream_min:DataStream[SensorReading] = dataStream_keyById.min("temperature")
//
val dataStream_minBy:DataStream[SensorReading] = dataStream_keyById.minBy("temperature")
//输出获取得到的数据
dataStream_min.print("min")
dataStream_minBy.print("minBy")
//通过reduce方法自定义取值方式,取最新时间的最小温度值
// 方法一:以匿名函数的形式写的
val dataStream_reduce:DataStream[SensorReading] = dataStream_keyById.reduce((currentData,newData)=>{
//id:按照分组的原Id timestamp:以最新的时间戳为主 temperature:原来的温度与最新的维度取最小值
SensorReading(currentData.id,newData.timestamp,currentData.temperature.min(newData.temperature))
})
//通过reduce方法自定义取值方式,取最新时间的最小温度值,
// 方法二:以ReduceFunction类形式写的
val dataStream_reduce1:DataStream[SensorReading] = dataStream_keyById.reduce(new MyReduce)
dataStream_reduce.print("reduce")
dataStream_reduce1.print("reduce1")
}
def main(args: Array[String]): Unit = {
//创建实时流处理的环境
val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置全局并行度为1
env.setParallelism(1)
//创建原始数据流
val dataStream_Original:DataStream[String]= env.socketTextStream("node160",8888)
transform_keyBy(env,dataStream_Original)
//实时数据流必须设置开启启动执行的语句
env.execute("transform")
}
输出数据如下?
SensorReading(sensor_1,1547718200,25.0) reduce> SensorReading(sensor_1,1547718200,25.0) min> SensorReading(sensor_1,1547718200,25.0) minBy> SensorReading(sensor_1,1547718200,25.0) SensorReading(sensor_1,1547718201,30.0) reduce> SensorReading(sensor_1,1547718201,25.0) minBy> SensorReading(sensor_1,1547718200,25.0) min> SensorReading(sensor_1,1547718200,25.0) SensorReading(sensor_1,1547718202,10.0) reduce> SensorReading(sensor_1,1547718202,10.0) min> SensorReading(sensor_1,1547718200,10.0) minBy> SensorReading(sensor_1,1547718202,10.0)
?c.多流转换算子
? ?(1)分流(SplitStream)
1.split():是一个过期的方法,但是还可以用,传入参数 -> 返回集合类型
2.分流之后接收的数据是SplitStream,是DataStream的子类
3.分流之后直接输出,与没有分流没有什么区别,需要用dataStream_HC.select获取拆分后标识符,在对获取标识符后的数据进行拆分
/**
* 分流算子
* 划分条件:数据>=30 高温流 ; 数据<30 低温流
* @param env 运行环境
* @param dataStreamOriginal 原数据
* @return
*/
def transform_splitStream(env:StreamExecutionEnvironment,dataStreamOriginal: DataStream[String]) = {
//把字符串的类型转换为sensorReading,方便
val dataStream_sensorReading:DataStream[SensorReading]= transform_simple(env,dataStreamOriginal)
//拆分流: dataStream调用的split
//type TraversableOnce[+A] = scala.collection.TraversableOnce[A]:集合的父类
//传入参数 -> 返回集合类型 hot or cool
//拆分后没有对数据进行改变,所以数据类型不变,例:分减
//把数据按照划分条件划分成高温流和低温流
val dataStream_HCC:SplitStream[SensorReading] = dataStream_sensorReading.split(data =>{
val temp = data.temperature
//用标识符获取对应数据
//温度>=30
/*if(temp>=30){
//高温流的一个标识
Seq("hot")
}else{
//温度<30
Seq("cool")
}*/
if(temp>=30){
//高温流的一个标识
Seq("hot")
}else if(temp>=20 && temp<30){
//20<=温度<30
Seq("comfortable")
}else{
Seq("cool")
}
})
//dataStream_HCC.select的参数为不定长参数,可以同时查看多个参数,同时包含多个参数
val dataStream_HC:DataStream[SensorReading] = dataStream_HCC.select("hot","cool")
//获取不同的数据可以进行不同的操作
//获取拆分后标识符为高温流的数据
val dataStream_hot:DataStream[SensorReading] = dataStream_HCC.select("hot")
//获取拆分后标识符为低温流的数据
val dataStream_cool:DataStream[SensorReading] = dataStream_HCC.select("cool")
val dataStream_comfortable:DataStream[SensorReading] = dataStream_HCC.select("comfortable")
//输出对应数据并且给予标识符
dataStream_hot.print("hot")
dataStream_cool.print("cool")
dataStream_comfortable.print("comfortable")
dataStream_HC.print("hc")
//直接输出,看不出变化,一起输出三个变量
dataStream_HC.print("hcc")
}
def main(args: Array[String]): Unit = {
//创建实时流处理的环境
val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置全局并行度为1
env.setParallelism(1)
//创建原始数据流
val dataStream_Original:DataStream[String]= env.socketTextStream("node160",8888)
transform_splitStream(env,dataStream_Original)
//实时数据流必须设置开启启动执行的语句
env.execute("transform")
}
(2)合流
1. 相同类型流:union
a.重点:类型相同 -> 同一个流进行操作
b.可以同时合并多个流,firstStream.union()的参数是不定长参数,可以合并多个流
/**
* 分流算子
* 划分条件:数据>=30 高温流 ; 数据<30 低温流
* @param env 运行环境
* @param dataStreamOriginal 原数据
* @return
*/
def transform_splitStream(env:StreamExecutionEnvironment,dataStreamOriginal: DataStream[String]) = {
//把字符串的类型转换为sensorReading,方便
val dataStream_sensorReading:DataStream[SensorReading]= transform_simple(env,dataStreamOriginal)
//拆分流: dataStream调用的split
//type TraversableOnce[+A] = scala.collection.TraversableOnce[A]:集合的父类
//传入参数 -> 返回集合类型 hot or cool
//拆分后没有对数据进行改变,所以数据类型不变,例:分减
//把数据按照划分条件划分成高温流和低温流
val dataStream_HCC:SplitStream[SensorReading] = dataStream_sensorReading.split(data =>{
val temp = data.temperature
//用标识符获取对应数据
//温度>=30
if(temp>=30){
//高温流的一个标识
Seq("hot")
}else if(temp>=20 && temp<30){
//20<=温度<30
Seq("comfortable")
}else{
Seq("cool")
}
})
//dataStream_HCC.select的参数为不定长参数,可以同时查看多个参数,同时包含多个参数
val dataStream_HC:DataStream[SensorReading] = dataStream_HCC.select("hot","cool")
//获取不同的数据可以进行不同的操作
//获取拆分后标识符为高温流的数据
val dataStream_hot:DataStream[SensorReading] = dataStream_HCC.select("hot")
//获取拆分后标识符为低温流的数据
val dataStream_cool:DataStream[SensorReading] = dataStream_HCC.select("cool")
val dataStream_comfortable:DataStream[SensorReading] = dataStream_HCC.select("comfortable")
//输出对应数据并且给予标识符
dataStream_hot.print("hot")
dataStream_cool.print("cool")
dataStream_comfortable.print("comfortable")
dataStream_HC.print("hc")
//直接输出,看不出变化,一起输出三个变量
dataStream_HC.print("hcc")
}
/**
* 把两个相同类型的流合并为一个流
* 重点:类型相同 -> 同一个进行操作
* @param firstStream 第一个流
* @param otherStream 第二个流
*/
//otherStream:DataStream[SensorReading]* : 不定长参数变成集合形式,不能直接接收
def transform_UnionStream(firstStream:DataStream[SensorReading],otherStream:DataStream[SensorReading]): Unit ={
//firstStream.union()的参数是不定长参数,可以合并多个流
//把两个SensorReading的流合并成一个流
//因为类型相同,所以可以统一输出
val dataStream_union:DataStream[SensorReading] = firstStream.union(otherStream)
dataStream_union.print("union")
}
def main(args: Array[String]): Unit = {
//创建实时流处理的环境
val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置全局并行度为1
env.setParallelism(1)
//创建原始数据流
val dataStream_Original:DataStream[String]= env.socketTextStream("node160",8888)
//transform_simple(env,dataStream_Original)
//transform_keyBy(env,dataStream_Original)
transform_splitStream(env,dataStream_Original)
//实时数据流必须设置开启启动执行的语句
env.execute("transform")
}
?2.?不同类型流:connect
a.首先采用简单转换算子,把相同类型的数据流转换为其他类型数据流
b.不同类型的流进行合并,由于流的类型不同,所有合并后的流的泛型无法统一,需要按照流的顺序进行写类型,注意顺序一致,和写入流对应的类型保持一致
c.不是DataStream的类型基本不能输出? dataStream_Connect.print()
d.把合并后的类型转换为DataStream类型,有两种方式
a1.对合并后的流进行处理,以匿名函数的形式进行装换,装换后的类型可以相同也可以不同
/**
* 合并不同类型的流
* @param firstStream 第一个流
* @param otherStream 第二个流
*/
def transform_ConnectStream(firstStream:DataStream[SensorReading],otherStream:DataStream[SensorReading]) = {
//采用简单转换算子
//为了实现不同类型的流合并,对其中一个流进行改造,把SensorReading转成了(String,Double)
val dataStream_Tuple:DataStream[(String,Double)] = firstStream.map(data=>{
(data.id,data.temperature)
})
//注意顺序一致,和写入流对应的类型保持一致
//不同类型的流进行合并,由于流的类型不同,所有合并后的流的泛型无法统一
val dataStream_Connect:ConnectedStreams[(String,Double),SensorReading] = dataStream_Tuple.connect(otherStream)
//不是DataStream的类型基本不能输出
//dataStream_Connect.print()
//参数:函数和类实现接口的写法
//对合并后的流进行处理,两个匿名函数类型的参数分别转换两个轮
val dataStream_ConnectResult:DataStream[String] = dataStream_Connect.map(
//第一个函数:元组
firstData =>{
firstData._1+"==="+firstData._2
},
//第二个函数:SensorReading
//想怎么写,就怎么写
//自定义写法:和第一个函数保持一致
secondData =>{
secondData.id+"==="+secondData.timestamp
}
)
/**
* 对两个参数进行操作可以转换为不同类行进行输出:Any
* 对不同流分别处理,但是以同一条流输出
*/
val dataStream_ConnectAny:DataStream[Any] = dataStream_Connect.map(
//第一个函数:元组
firstData =>{
firstData._1+"==="+firstData._2
},
//第二个函数:SensorReading
//想怎么写,就怎么写
//自定义写法:和第一个函数保持一致
secondData =>{
SensorReading
}
)
//对不同类型的流合并后进行输出
dataStream_ConnectResult.print("connectString")
dataStream_ConnectAny.print("connectAny")
dataStream_ConnectClass.print("connectClass")
}
/**
* 分流算子
* 划分条件:数据>=30 高温流 ; 数据<30 低温流
* @param env 运行环境
* @param dataStreamOriginal 原数据
* @return
*/
def transform_splitStream(env:StreamExecutionEnvironment,dataStreamOriginal: DataStream[String]) = {
//把字符串的类型转换为sensorReading,方便
val dataStream_sensorReading:DataStream[SensorReading]= transform_simple(env,dataStreamOriginal)
//拆分流: dataStream调用的split
//type TraversableOnce[+A] = scala.collection.TraversableOnce[A]:集合的父类
//传入参数 -> 返回集合类型 hot or cool
//拆分后没有对数据进行改变,所以数据类型不变,例:分减
//把数据按照划分条件划分成高温流和低温流
val dataStream_HCC:SplitStream[SensorReading] = dataStream_sensorReading.split(data =>{
val temp = data.temperature
//用标识符获取对应数据
//温度>=30
/*if(temp>=30){
//高温流的一个标识
Seq("hot")
}else{
//温度<30
Seq("cool")
}*/
if(temp>=30){
//高温流的一个标识
Seq("hot")
}else if(temp>=20 && temp<30){
//20<=温度<30
Seq("comfortable")
}else{
Seq("cool")
}
})
//dataStream_HCC.select的参数为不定长参数,可以同时查看多个参数,同时包含多个参数
val dataStream_HC:DataStream[SensorReading] = dataStream_HCC.select("hot","cool")
//获取不同的数据可以进行不同的操作
//获取拆分后标识符为高温流的数据
val dataStream_hot:DataStream[SensorReading] = dataStream_HCC.select("hot")
//获取拆分后标识符为低温流的数据
val dataStream_cool:DataStream[SensorReading] = dataStream_HCC.select("cool")
val dataStream_comfortable:DataStream[SensorReading] = dataStream_HCC.select("comfortable")
//输出对应数据并且给予标识符
dataStream_hot.print("hot")
dataStream_cool.print("cool")
dataStream_comfortable.print("comfortable")
dataStream_HC.print("hc")
//直接输出,看不出变化,一起输出三个变量
dataStream_HC.print("hcc")
//把刚才拆分出来的高温流和低温流进行不同类型的合并,成一个流进行处理
transform_ConnectStream(dataStream_hot,dataStream_cool)
}
?b1.实例化类进行转换类型
//继承CoMapFunction或者RichCoMapFunction
//RichCoMapFunction<IN1, IN2, OUT>三个泛型
//自定义类
class MyCoMap extends RichCoMapFunction[(String,Double),SensorReading,Any]{
override def map1(value: (String, Double)): Any = {
value._1+"==="+value._2
}
override def map2(value: SensorReading): Any = {
SensorReading
}
}
//方法二:实例化类进行转换类型
val dataStream_ConnectClass:DataStream[Any]= dataStream_Connect.map(new MyCoMap)
?
|