flink.apache.org Flink是有状态的(sateful):Stateful Computations over Data Streams
起源欧洲,后被阿里收购,才在中国普及。在此之前都是用spark。
Flink也是做客户端,Flink on k8s、Yarn、Mesos,目前还是 Flink on Yarn,以后 on k8s. on k8s.可以实现资源隔离,各个任务不用存在资源抢占。
可以接实时的数据,做流处理;也可以接DB,dfs的数据 ,做批处理。
Flink 更强于做流处理 【DataSet API (Legacy)】,Spark更强于做批处理
Flink 的特点
批流一体 DataSet-批 、DataStream-流。 高吞吐、低延迟、高性能。 真正的流处理,支持基于Event-time的操作。也支持window操作。 支持带状态的sateful的Exactly-Once(*****)。
DataStream
位置: documentation/Application Development/DataStream API DataStream 也是不可变的。
简单上手
可以通过maven快速创建一个fink工程demo
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-walkthrough-datastream-java \
-DarchetypeVersion=1.14.4 \
-DgroupId=frauddetection \
-DartifactId=frauddetection \
-Dversion=0.1 \
-Dpackage=spendreport \
-DinteractiveMode=false
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/datastream_api.html
Example
- 引入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
- 批处理
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
object BatchWCApp {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val text: DataSet[String] = env.readTextFile("data/wc.txt")
val result = text.flatMap(_.split(","))
.map((_, 1))
.groupBy(0)
.sum(1)
result.print()
}
}
- 流处理
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object StreamingWCApp {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text: DataStream[String] = env.socketTextStream("gargantua", 9527)
text.flatMap(_.split(","))
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(_._1)
.sum(1)
.print().setParallelism(1)
env.execute(this.getClass.getSimpleName)
4>(pig,1)
4>(dog,2)
4>(dog,3)
}
}
- java 版本的批处理
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> source = env.readTextFile("data/wc.txt");
test02(source);
}
private static void test02(DataSource<String> source) throws Exception {
System.out.println("------------");
FlatMapOperator<String, String> flatMapOperator = source.flatMap((FlatMapFunction<String, String>) (s, collector) -> {
String[] words = s.split(",");
for (String word : words) {
collector.collect(word);
}
}).returns(Types.STRING);
MapOperator<String, Tuple2<String, Integer>> mapOperator = flatMapOperator.map(x -> Tuple2.of(x, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
mapOperator.groupBy(0).sum(1).print();
- java 版本的流处理
…
Flink 的编程模型
Anatomy of a Flink Program
获取执行环境 [val env = getExecutionEnvironment()、createLocalEnvironment()]
加载/创建初始数据 [val input = env.readTextFile("data/wc.txt")、env.socketTextStream("localhost", 9999)]
作用此数据的transformations算子 [input.map { x => x.toInt }...]
指定计算结果的存放位置 [writeAsText(path: String))、print()、writeToSocket()、addSink()]
触发程序执行(流处理) [env.execute(this.getClass.getSimpleName)]
获取执行环境
-
获取批处理执行环境 ,如同SparkConf、SparkContext val env = ExecutionEnvironment.getExecutionEnvironment
-
获取流处理执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment
-
获取环境时可以指定让这个任务展示到Web UI。 引入依赖 <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
</dependency>
获取带Web UI 的 Environment val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
默认启动在8080端口,如果占用就8081依次递增。如果有打开的UI页面提前打开,或前一次没有关闭,但后台重启,会报错。需要把UI关掉再打开。
数据源
基于文件(批处理):
readTextFile()
readFile(fileInputFormat, path)
readFile(fileInputFormat, path, watchType, interval, pathFilter)
基于socket(流处理):
socketTextStream
基于scala/java集合
fromCollection(Seq)
fromCollection(Iterator)
fromElements(elements: _*)
fromParallelCollection(SplittableIterator)
generateSequence(from, to)
addSource
Transformations
map、 filter 、keyby 、window...
数据输出
writeAsText() / TextOutputFormat
writeAsCsv(...) / CsvOutputFormat
print() / printToErr()
writeUsingOutputFormat() / FileOutputFormat
writeToSocket
addSink
并行度
获取当前并行度:stream.parallelism
env.socketTextStream 、env.fromCollection(List()) 、fromElements 的时候,并行度是1, 意味着不能并行接收。 且不能自己强制设置并行度,因为源码设置为1,或只要实现SourceFuncation的也都是单并行。
env.readTextFile() 、env.fromParallelCollection()、env.generateSequence()是多并行度读取。
对带并行度的数据源ParallSourceFunction如果不指定并行度,就会使用当前机器的CPU线程数。会把资源占尽。 transformation : fliter、flatmap 等,如果不指定并行度,也是取决于CPU线程。 print():如果不指定并行度,也是取决于CPU线程。
设置并行度
(仅针对能设置并行度时)
env阶段: env.setParallelism(2)
source阶段 : env.addSource(...).setParallelism(2)
transformation阶段: 默认是用完CPU,在生产一般都要自己重新设置。
sink阶段:
自定义数据源 addSource
内置的数据源如fromCollection(),底层也是用的addSource()。
定义数据源 addSource(new xxxSourceFunction())
对于xxxSourceFunction,有很多,都是常用Function的子接口和子类,如可以并行的ParalleSourceFunction、RichSourceFunction、RichParalleSourceFunction
继承自单并行度的自定义sourceFunction只能串行,继承自多并行度的自定义sourceFunction默认并行度为CPU线程数,也可以自己setParallelism。
自定义sourceFunction,需要实现run() 和cancel() 方法。源码有demo参考。
继承自RichFunction(增强)的自定义sourceFunction,除了run() 和cancel() ,还可以实现open()和close()方法。open是每一个并行度执行前都会调用的,如数据需要和mysql关联,可以在open中加载连接。
jdbc 读取数据源到source
继承RichSourceFunction,重写以下方法:在open方法中加载驱动、获取jdbc连接、准备查询语句,在run 方法执行SQL语句,在close中关闭连接释放资源。
kafka 读取数据源到source
导入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
新API可以直接使用KafkaSource.builder() 出一个kafkaSource。是多并行度的。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = KafkaSource.builder()
.setBootstrapServers("hadoop000:9092,hadoop000:9093,hadoop000:9094")
.setTopics("flinktopic")
.setGroupId("ruozedata_flink_topic_group") // pk-group
.setStartingOffsets(OffsetsInitializer.latest()) // earliest
.setValueOnlyDeserializer(new SimpleStringSchema())
.build()
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source").print()
Transformations
Map、FlatMap、Filter、KeyBy、Reduce、Window、WindowAll 、Window Apply、WindowReduce、Union、Window Join、Interval Join 、Window CoGroup、Connect、CoMap, CoFlatMap、Iterate
map 是将一个 DataStream转另一个 DataStream .map(_*2)的底层操作是
.transform("my map",new SteamMap(new MapFunction(
override def map(value:Int) = value * 2
))
union: 两个或多个数据源 合并成一个数据源。 union时是必须要相同的数据类型 自己合并自己会得到两倍…
connect:两个数据源,关联成一个,但是内部其实两个流还是独立,知识可以共用State状态信息,且两个流数据类型还可以不一样
connect vs union
1) 合并后:一个流 / 多个流
2) 数据类型:是否一定要相同的数据类型
3) 个数问题:connect的map操作,也是会有两个参数,分开操作
需求:两个流分别做,stream1转大写,stream转Int
stream1.connect(stream2).map(new CoMapFunction[String,Int, String] {
override def map1(value: String): String = value.toUpperCase()
override def map2(value: Int): String = value * 10 + ""
})
分区器 Partitioner
自定义分区器MyPartitioner,继承Partitioner,重写partition方法,在其中指定分区规则。
通过partitionCustom指定使用分区
.partitionCustom(new MyPartitioner, x => x._1)
Sink
Flink03:https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/
自定义 Sink。继承RichSinkFunction,重写open方法和invoke方法,在invoke方法中指定具体逻辑输出。
输出到文件
输出到文件系统,由于多并行,会产生很多小文件夹。流处理就不适合输出文件系统。 但是setParallelism(1)时可以输出到一个文件
输出到 kafka
需要引入依赖(和source是同一个)
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
</dependency>
官网提供API可以直接使用KafkaSource.builder() 出一个sink,使用sinkTo输出到kafka
val sink: KafkaSink[String] = KafkaSink.builder()
.setBootstrapServers("hadoop000:9092,hadoop000:9093,hadoop000:9094")
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("flinktopic")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
).build()
输出到 redis
需要引入依赖
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
API只提供了RedisSink,还需要自己实现RedisMapper。
数据到 jdbc
参考官网。
输出到 socket
accessStream.writeToSocket("gargantua", 9526, new SimpleStringSchema())
输出到 nc - lk 9526 的窗口了
|