什么是Flink
概念
Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算(会保留之前的状态)。Flink设计为在所有常见的集群环境(yarn、本地等都可以)中运行,以内存速度和任何规模执行计算。
特点
flink是事件驱动应用(来一个数据就计算,来一个就计算) spark是时间驱动,几秒处理一次
- 支持高吞吐、低延迟、高性能的流处理
- 支持带有事件时间的窗口(Window)操作
- 支持有状态计算的Exactly-once语义(只处理一次)
- 支持高度灵活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作,三个大类,八种小类窗口
- 支持具有反压功能的持续流模型(比如spark假如是五秒处理一次,假如一个五秒数据没有处理完成,数据会积压,会造成各种问题,但flink没有处理完数据,会减缓数据来的速度)
- 支持基于轻量级分布式快照(Snapshot)实现的容错
- 一个运行时同时支持Batch on Streaming处理和Streaming处理
- Flink在JVM内部实现了自己的内存管理,避免了出现oom(spark容易内存溢出,不稳定)
- 支持迭代计算
- 支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存
Flink技术栈
local:idea中右键运行 cluster:standalone自己就是一个集群的模式、yarn集群运行模式 cloud:云,比如阿里云之类的
flink实现了流批统一
本地安装flink
加载配置文件: 我这里安装的是2.11版本,额外只需要加上scala和Java的编译环境即可,flink就是Java写的
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>ShuJia01</artifactId>
<groupId>ShuJia</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>flink</artifactId>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.11.2</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.12</scala.version>
<log4j.version>2.12.1</log4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.36</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- Scala Compiler -->
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
写代码之前的准备工作
1、在运行flink程序代码之前,一定要导入隐式转换(每一次写程序都要导入这个) 2、先对共同的代码做一个封装,再写之后的代码
abstract class FlinkTool {
def main(args: Array[String]): Unit = {
//创建flink的环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度
env.setParallelism(2)
run(env)
env.execute()//启动flink
}
def run(env:StreamExecutionEnvironment)
}
3、做一个日志的打印 在flink项目的resources中加上log4j2.properties的配置,打印日志信息
Flink处理数据模型
Flink组件
flink的流程一共三步,先是source读取数据,再进行transform转换和计算,最后就是sink输出数据到某个地方
source
Flink 在流处理和批处理上的 source 大概有 4 类:
- 基于本地集合的 source、
- 基于文件的 source、
- 基于网络套接字的 source、
- 自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,当然你也可以定义自己的 source。
我们一个个来看
基于本地集合的source
flink会默认根据你电脑的核数来确定并行度,我这里设置了我的并行度为两个,所以这里的结果会将数据默认分到两个分区里面,采用的是hash分区的方式
//基于本地集合构建DataStream -- 有界流
object Demo1Source extends FlinkTool{
override def run(env: StreamExecutionEnvironment): Unit = {
val linesDS: DataStream[Int] = env.fromCollection(List(1,2,3,4,5))
linesDS.print()
}
}
基于文件的 source
//基于文件构建DataStream -- 有界流
object Demo1Source extends FlinkTool{
override def run(env: StreamExecutionEnvironment): Unit =
val stuDS: DataStream[String] = env.readTextFile("D:\\BigDaTa\\JAVA_Project\\ShuJia01\\data\\students.txt")
stuDS
.map(word=>(word.split(",")(4),1))
.keyBy(_._1)
.sum(1)
.print()
}
}
基于网络套接字的 source
这里可以在虚拟机里面开启一个端口 nc -lk 端口号,没有nc命令可以 yum install nc 下载一个
//基于socket构建DataStream-- 无界流
object Demo1Source extends FlinkTool{
override def run(env: StreamExecutionEnvironment): Unit = {
val socketDS: DataStream[String] = env.socketTextStream("master",8888)
socketDS.print()
}
}
自定义的 source
自定义source尝试
source的数据是未了后面的算子做处理做准备的,所以这里的数据要发送到下游 使用自定义的source,只需要继承SourceFunction,重写run和cancel方法即可,cacel方法可以不用写东西,这里要注意加上泛型
package com.shujia.source
import com.shujia.common.FlinkTool
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
object Demo1Source extends FlinkTool{
override def run(env: StreamExecutionEnvironment): Unit = {
val myDS: DataStream[Int] = env.addSource(new MySource)
myDS.print()
}
class MySource extends SourceFunction[Int]{
override def run(sc: SourceFunction.SourceContext[Int]): Unit = {
var i = 0
while (true){
sc.collect(i)
Thread.sleep(100)
i+= 1
}
}
override def cancel(): Unit = {}
}
自定义MySQL数据源接入,四种不同模式
在写自定义数据源的时候,需要继承SourceFunction,这里记录四种常见的的方法
继承SourceFunction - 单一的source , run方法只会执行一次 继承ParallelSourceFunction - 并行的source ,有多少个并行度就会有多少个source 继承RichSourceFunction 多了open和close方法 继承RichParallelSourceFunction 多了并行度和open和close方法
继承SourceFunction
单一的source , run方法只会执行一次,但是这样每调用一次MySQL都会创建一次连接,非常消耗资源
package com.shujia.source
import java.sql._
import com.shujia.common.FlinkTool
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
object Demo2MysqlSource extends FlinkTool{
override def run(env: StreamExecutionEnvironment): Unit = {
val mysqlDS: DataStream[(Int, String, Int, String, String, String)] = env.addSource(new MysqlSource)
mysqlDS.print()
}
}
class MysqlSource extends SourceFunction[(Int,String,Int,String,String,String)]{
override def run(sourceContext: SourceFunction.SourceContext[(Int,String,Int,String,String,String)]): Unit = {
//加载驱动
Class.forName("com.mysql.jdbc.Driver")
//建立连接
val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student","root","123456")
val statement: PreparedStatement = conn.prepareStatement("select * from student limit 10")
val resultSet: ResultSet = statement.executeQuery()
while (resultSet.next()){
val id: Int = resultSet.getInt("id")
val name: String = resultSet.getString("name")
val age: Int = resultSet.getInt("age")
val gender: String = resultSet.getString("gender")
val clazz: String = resultSet.getString("clazz")
val last_mod: String = resultSet.getString("last_mod")
//将数据发送到下游
sourceContext.collect((id,name,age,gender,clazz,last_mod))
}
conn.close()
}
override def cancel(): Unit = {}
}
继承RichSourceFunction
多了open和close方法,这样就可以在open方法中创建一次连接,在close中关闭,节省资源
class MysqlSource extends RichSourceFunction[(Int,String,Int,String,String,String)]{
var conn: Connection = _
override def open(parameters: Configuration): Unit = {
//加载驱动
Class.forName("com.mysql.jdbc.Driver")
//建立连接
conn = DriverManager.getConnection("jdbc:mysql://master:3306/student","root","123456")
}
override def run(sourceContext: SourceFunction.SourceContext[(Int,String,Int,String,String,String)]): Unit = {
val statement: PreparedStatement = conn.prepareStatement("select * from student limit 10")
val resultSet: ResultSet = statement.executeQuery()
while (resultSet.next()){
val id: Int = resultSet.getInt("id")
val name: String = resultSet.getString("name")
val age: Int = resultSet.getInt("age")
val gender: String = resultSet.getString("gender")
val clazz: String = resultSet.getString("clazz")
val last_mod: String = resultSet.getString("last_mod")
//将数据发送到下游
sourceContext.collect((id,name,age,gender,clazz,last_mod))
}
}
override def close(): Unit = {
conn.close()
}
override def cancel(): Unit = {}
}
transform
flink的算子不是懒执行的算子,不管怎么样都会执行的,不需要行动算子也会执行
map
我们可以看到map函数,里面有两种方式,都是传入一个函数,只不过一个是传入Scala写的函数,一个是传入Java函数
传入Scala函数
传入Scala函数:和之前一样的操作即可
linesDS.map(
line=>{
line+"scala"
}
).print()
传入Java函数
继承MapFunction
传入Java函数,我们可以看到这里只要继承MapFunction即可,这里面有两个参数,一个是传入的参数类型,一个是输出的参数类型
linesDS.map(new MapFunction[String,String] {
override def map(line: String): String = {
line+"java"
}
}).print()
继承RichMapFunction (可以做open和close)
linesDS.map(new RichMapFunction[String,String] {
override def open(parameters: Configuration): Unit = {
}
override def map(line: String): String = {
line+"java"
}
override def close(): Unit = {
}
})
效果相同
flatMap
传入Scala函数
val linesDS: DataStream[String] = env.socketTextStream("master",8888)
linesDS.flatMap(line=>{
line
.split(",")
}).print()
传入Java函数
linesDS.flatMap(new RichFlatMapFunction[String,String] {
override def open(parameters: Configuration): Unit = {
println("open")
}
override def close(): Unit = {
println("close")
}
override def flatMap(in: String, collector: Collector[String]): Unit = {
in
.split(",")
.foreach(word=>{
collector.collect(word)
})
}
}).print()
Filter
val stuDS: DataStream[String] = env.readTextFile("data/students.txt")
stuDS.filter(new FilterFunction[String] {
override def filter(t: String): Boolean = {
println("filter")
t.split(",")(3) == "男"
}
})print()
KeyBy
//将相同的key发送到同一个task中
val stuDS: DataStream[String] = env.readTextFile("data/students.txt")
stuDS.keyBy(new KeySelector[String,String] {
override def getKey(in: String): String = {
in
}
}).print()
Reduce
用于KeyBy之后对数据进行聚合
val linesDS: DataStream[String] = env.socketTextStream("master",8888)
val kvDS: DataStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_,1))
val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1)
/**
* reduce:keyby之后对数据进行聚合
*/
keyByDS.reduce(new ReduceFunction[(String, Int)] {
override def reduce(t: (String, Int), t1: (String, Int)): (String, Int) = {
(t1._1,t1._2+t._2)
}
}).print()
Agg
package com.shujia.transformation
import com.shujia.common.FlinkTool
import org.apache.flink.streaming.api.scala._
object Demo6Agg extends FlinkTool{
override def run(env: StreamExecutionEnvironment): Unit = {
val studentDS: DataStream[String] = env.readTextFile("data/students.txt")
val stuDS: DataStream[Student] = studentDS.map(line => {
val split: Array[String] = line.split(",")
Student(split(0), split(1), split(2).toInt, split(3), split(4))
})
stuDS
.keyBy(_.clazz)
.sum("age")
// .print()
/**
* max 和 maxBy 之间的区别在于 max 返回流中的最大值,但 maxBy 返回具有最大值的键,
*/
stuDS
.keyBy(_.clazz)
.maxBy("age")
.print()
}
}
case class Student(id: String, name: String, age: Int, gender: String, clazz: String)
Window
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
/**
* 每个5秒统计一次单词的数量
*
*/
val kvDS: DataStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1))
val countDS: DataStream[(String, Int)] = kvDS
.keyBy(_._1)
//5秒一个窗口
.timeWindow(Time.seconds(5))
.sum(1)
countDS.print()
union
val ds1: DataStream[Int] = env.fromCollection(List(1,2,3,4,5,6))
val ds2: DataStream[Int] = env.fromCollection(List(4,5,6,7,8,9))
/**
* 合并DataStream 类型要一致
*/
val unionDS: DataStream[Int] = ds1.union(ds2)
unionDS.print()
SideOutput
旁路输出
val studentsDS: DataStream[String] = env.readTextFile("data/students.txt")
/**
* 将性别为男和性别为女学生单独拿出来
*
*/
val nan: OutputTag[String] = OutputTag[String]("男")
val nv: OutputTag[String] = OutputTag[String]("女")
val processDS: DataStream[String] = studentsDS.process(new ProcessFunction[String, String] {
override def processElement(line: String, ctx: ProcessFunction[String, String]
val gender: String = line.split(",")(3)
gender match {
//旁路输出
case "男" => ctx.output(nan, line)
case "女" => ctx.output(nv, line)
}
}
})
//获取旁路输出的DataStream
val nanDS: DataStream[String] = processDS.getSideOutput(nan)
val nvDS: DataStream[String] = processDS.getSideOutput(nv)
nvDS.print()
sink
自定义输出
package com.shujia.sink
import com.shujia.common.FlinkTool
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala._
object Demo1Sink extends FlinkTool{
override def run(env: StreamExecutionEnvironment): Unit = {
val studentDS: DataStream[String] = env.readTextFile("data/students.txt")
// studentDS.print()
/**
* 自定义sink
*
*/
studentDS.addSink(new MySink)
}
}
class MySink extends SinkFunction[String]{
/**
* invoke : 每一条数据都会执行一次
*
* @param line 数据
* @param context 上下文对象
*/
override def invoke(line: String, context: SinkFunction.Context[_]): Unit = {
println(line)
}
}
自定义输出到MySQL
package com.shujia.sink
import java.sql.{Connection, DriverManager, PreparedStatement}
import com.shujia.common.FlinkTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._
object Demo2MysqlSink extends FlinkTool{
override def run(env: StreamExecutionEnvironment): Unit = {
val studentDS: DataStream[String] = env.readTextFile("data/students.txt")
studentDS.addSink(new MysqlSink)
}
}
class MysqlSink extends RichSinkFunction[String] {
var con: Connection = _
/**
* 在invoke 之前执行,每一个task中只只一次
*
*/
override def open(parameters: Configuration): Unit = {
println("创建连接")
//加载驱动
Class.forName("com.mysql.jdbc.Driver")
//1、建立链接
con = DriverManager.getConnection("jdbc:mysql://master:3306/student?useUnicode=true&characterEncoding=utf-8", "root", "123456")
}
override def close(): Unit = {
println("关闭连接")
con.close()
}
/**
* 每一条数据都会执行一次
*
*/
override def invoke(line: String, context: SinkFunction.Context[_]): Unit = {
val split: Array[String] = line.split(",")
val stat: PreparedStatement = con.prepareStatement("insert into student(id,name,age,gender,clazz) values(?,?,?,?,?)")
stat.setString(1, split(0))
stat.setString(2, split(1))
stat.setInt(3, split(2).toInt)
stat.setString(4, split(3))
stat.setString(5, split(4))
stat.execute()
}
}
感谢阅读,我是啊帅和和,一位大数据专业大四学生,祝你快乐。
|