??本篇文章主要介绍Flink中的常见转换算子和自定义函数
一、转换算子
1.单流输入
val env = StreamExecutionEnvironment.getExecutionEnvironment
1.1.map、filter:对流中的每个元素进行操作
env.fromElements(1,2,3,4,5,6)
.map(_*10)
.print()
env.fromElements(1,2,3,4,5,6)
.filter(_>5)
.print()
1.2、flatMap:将多个集合里的元素取出来,组成一个新的集合
env.fromCollection(List("AA BB CC","aa bb cc"))
.flatMap(_.split(" "))
.print()
1.3、keyBy: ??输入必须是 Tuple 类型 ??将一个流拆分成不相交的分区 ??默认根据元素指定的key的hashcode值进行分区 ??数据没有分流,还是在一个数据流中
env.fromCollection(List("aa bb cc","aa bb nn"))
.flatMap(_.split(" "))
.map((_,1))
.keyBy(_._1)
.sum(1)
.print()*
1.4、reduce:将上一次计算完以后的结果跟新的数据再进行聚合运算
env.fromCollection(List("aa bb cc","aa bb nn"))
.flatMap(_.split(" "))
.map((_,1))
.keyBy(_._1)
.reduce(
(t1,t2)=>(t1._1,t1._2+t2._2)
)
.print()
1.5、aggregations:提供聚合算子,如求max、min、sum、minBy、maxBy 2.多流输入 2.1、union ??对两个或者两个以上的 DataStream 进行合并操作 ??需要保证两个数据集的格式是一致的
val input1 = env.fromElements(1, 2, 3, 4, 5, 6)
val input2 = env.fromElements(10,10,20,30)
val unionDataStream = input1.union(input2)
2.2、connect ??两份数据流被 Connect 之后,只是被放在了同一个流中 ??内部依然保持各自的数据和形式不发生变化,两份数据相互独立
val input1 = env.fromElements(1, 2, 3, 4, 5, 6)
val input3 = env.fromElements("aa", "bb", "cc")
val connStream: ConnectedStreams[Int, String] = input1.connect(input3)
connStream.map(new CoMapFunction[Int,String,String] {
override def map1(value: Int): String = {
"tom0" + value
}
override def map2(value: String): String = {
value.toUpperCase()
}
}).print()
2.3、CoGroup:两个数据流/集合按照key进行group
val input4=env.socketTextStream("192.168.217.110",9002)
.map(new MapFunction<String, Tuple2<String,String>>() {
@Override
public Tuple2<String,String> map(String s) throws Exception {
return Tuple2.of(s.split(" ")[0],s.split(" ")[1]);
}
});
val input5=env.socketTextStream("192.168.217.110",9001)
.map(new MapFunction<String, Tuple2<String,String>>() {
@Override
public Tuple2<String,String> map(String s) throws Exception {
return Tuple2.of(s.split(" ")[0],s.split(" ")[1]);
}
});
input1.coGroup(input2)
.where(new KeySelector<Tuple2<String,String>, Object>() {
@Override
public Object getKey(Tuple2<String, String> value) throws Exception {
return value.f0;
}
}).equalTo(new KeySelector<Tuple2<String,String>, Object>() {
@Override
public Object getKey(Tuple2<String, String> value) throws Exception {
return value.f0;
}
}).window(ProcessingTimeSessionWindows.withGap(Time.seconds(3)))
.trigger(CountTrigger.of(1))
.apply(new CoGroupFunction<Tuple2<String,String>, Tuple2<String,String>, Object>() {
@Override
public void coGroup(Iterable<Tuple2<String, String>> iterable, Iterable<Tuple2<String, String>> iterable1, Collector<Object> collector) throws Exception {
StringBuffer buffer=new StringBuffer();
buffer.append("DataStream frist:\n");
for(Tuple2<String,String> value:iterable){
buffer.append(value.f0+"=>"+value.f1+"\n");
}
buffer.append("DataStream second:\n");
for(Tuple2<String,String> value:iterable1){
buffer.append(value.f0+"=>"+value.f1+"\n");
}
collector.collect(buffer.toString());
}
}).print();
2.3、coMap、coFlatMap:对ConnectedStreams进行map和flatmap 3.分区算子 3.1、dataStream.shuffle:随机分区 ??dataStream.rebalance:循环分区 ??dataStream.rescale :调节分区 ??dataStream.global:数据发往同一个分区 ??dataStream.broadcast: 3.2、自定义分区
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment env.readTextFile("filePath")
.flatMap(_.split(" "))
.map((_,1))
.partitionCustom(new MyPartitioner,_._1)
.print()
env.execute()
}
class MyPartitioner extends Partitioner[String]{
override def partition(k: String, i: Int): Int = {
if(k.equals("dd")){
0
}else{
1
}
}
}
4.侧输出流
val oddTag = new OutputTag[Int]("奇数")
val evenTag = new OutputTag[Int]("偶数")
val result = env.fromElements(1, 2, 3, 4, 5, 6)
.process(new ProcessFunction[Int, Int] {
override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]): Unit = {
if (value % 2 == 0) {
ctx.output(evenOutPutTag,value)
} else {
ctx.output(oddOutPutTag, value)
}
}
})
二、自定义函数
5.1、函数类 Flink 包含了各类算子实现UDF函数的抽象类或者接口 例如 MapFunction, FilterFunction, ProcessFunction 等等
class MyFilter() extends FilterFunction[String]{
override def filter(t: String): Boolean = t.equals("flink")
}
dataStream.filter(new MyFilter())
5.2、RichFunction 可以获取运行环境的上下文 ??getRuntimeContext()获取运行时上下文,例如函数执行的并行度,任务的名字,以及 state 状态等 ??setRuntimeContext()设置运行时上下文,拥有生命周期方法 ??open()初始化方法,当一个算子被调用之前 open()会被调用 ??close()生命周期中最后调用的方法,做一些清理工作 ??如:RichMapFunction、RichFlatMapFunction
|