IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink入门篇 -> 正文阅读

[大数据]Flink入门篇

什么是Flink

概念

Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算会保留之前的状态)。Flink设计为在所有常见的集群环境(yarn、本地等都可以)中运行,以内存速度和任何规模执行计算。
在这里插入图片描述

特点

在这里插入图片描述
flink是事件驱动应用(来一个数据就计算,来一个就计算)
spark是时间驱动,几秒处理一次

  • 支持高吞吐、低延迟、高性能的流处理
  • 支持带有事件时间的窗口(Window)操作
  • 支持有状态计算的Exactly-once语义(只处理一次)
  • 支持高度灵活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作,三个大类,八种小类窗口
  • 支持具有反压功能的持续流模型(比如spark假如是五秒处理一次,假如一个五秒数据没有处理完成,数据会积压,会造成各种问题,但flink没有处理完数据,会减缓数据来的速度)
  • 支持基于轻量级分布式快照(Snapshot)实现的容错
  • 一个运行时同时支持Batch on Streaming处理和Streaming处理
  • Flink在JVM内部实现了自己的内存管理,避免了出现oom(spark容易内存溢出,不稳定)
  • 支持迭代计算
  • 支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存

Flink技术栈

local:idea中右键运行
cluster:standalone自己就是一个集群的模式、yarn集群运行模式
cloud:云,比如阿里云之类的

flink实现了流批统一
在这里插入图片描述

本地安装flink

加载配置文件:
我这里安装的是2.11版本,额外只需要加上scala和Java的编译环境即可,flink就是Java写的

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>ShuJia01</artifactId>
        <groupId>ShuJia</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>flink</artifactId>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.11.2</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <scala.version>2.11.12</scala.version>
        <log4j.version>2.12.1</log4j.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>




        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.36</version>
        </dependency>


    </dependencies>

    <build>
        <plugins>
            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <!-- Scala Compiler -->
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>


</project>

写代码之前的准备工作

1、在运行flink程序代码之前,一定要导入隐式转换(每一次写程序都要导入这个
在这里插入图片描述
2、先对共同的代码做一个封装,再写之后的代码

abstract class FlinkTool {
  def main(args: Array[String]): Unit = {
    //创建flink的环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //设置并行度
    env.setParallelism(2)
    run(env)
    env.execute()//启动flink
  }
  def run(env:StreamExecutionEnvironment)
}

3、做一个日志的打印
在flink项目的resources中加上log4j2.properties的配置,打印日志信息
在这里插入图片描述

Flink处理数据模型

在这里插入图片描述

Flink组件

flink的流程一共三步,先是source读取数据,再进行transform转换和计算,最后就是sink输出数据到某个地方

source

Flink 在流处理和批处理上的 source 大概有 4 类:

  • 基于本地集合的 source、
  • 基于文件的 source、
  • 基于网络套接字的 source、
  • 自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,当然你也可以定义自己的 source。
    我们一个个来看

基于本地集合的source

flink会默认根据你电脑的核数来确定并行度,我这里设置了我的并行度为两个,所以这里的结果会将数据默认分到两个分区里面,采用的是hash分区的方式

//基于本地集合构建DataStream  -- 有界流
object Demo1Source extends FlinkTool{
  override def run(env: StreamExecutionEnvironment): Unit = {
    val linesDS: DataStream[Int] = env.fromCollection(List(1,2,3,4,5))

    linesDS.print()
}
}

在这里插入图片描述

基于文件的 source

//基于文件构建DataStream -- 有界流
object Demo1Source extends FlinkTool{
  override def run(env: StreamExecutionEnvironment): Unit = 
    val stuDS: DataStream[String] = env.readTextFile("D:\\BigDaTa\\JAVA_Project\\ShuJia01\\data\\students.txt")
    stuDS
      .map(word=>(word.split(",")(4),1))
      .keyBy(_._1)
      .sum(1)
      .print()
  }
}

在这里插入图片描述

基于网络套接字的 source

这里可以在虚拟机里面开启一个端口
nc -lk 端口号,没有nc命令可以 yum install nc 下载一个

//基于socket构建DataStream-- 无界流
object Demo1Source extends FlinkTool{
  override def run(env: StreamExecutionEnvironment): Unit = {
    val socketDS: DataStream[String] = env.socketTextStream("master",8888)
    socketDS.print()
  }
}

在这里插入图片描述

自定义的 source

自定义source尝试

source的数据是未了后面的算子做处理做准备的,所以这里的数据要发送到下游
使用自定义的source,只需要继承SourceFunction,重写run和cancel方法即可,cacel方法可以不用写东西,这里要注意加上泛型
在这里插入图片描述

//自定义source, 实现SourceFunction接口
//实现run方法
package com.shujia.source

import com.shujia.common.FlinkTool
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._

object Demo1Source extends FlinkTool{
  override def run(env: StreamExecutionEnvironment): Unit = {
   val myDS: DataStream[Int] = env.addSource(new MySource)
   myDS.print()
  }
class MySource extends SourceFunction[Int]{
  //run方法只执行一次
  override def run(sc: SourceFunction.SourceContext[Int]): Unit = {
    var i = 0
    while (true){
      sc.collect(i) //将数据发到下游
      Thread.sleep(100)
      i+= 1
    }
  }
  //任务取消的时候执行,用于回收资源
  override def cancel(): Unit = {}
}

在这里插入图片描述

自定义MySQL数据源接入,四种不同模式

在写自定义数据源的时候,需要继承SourceFunction,这里记录四种常见的的方法

继承SourceFunction - 单一的source , run方法只会执行一次
继承ParallelSourceFunction - 并行的source ,有多少个并行度就会有多少个source
继承RichSourceFunction 多了open和close方法
继承RichParallelSourceFunction 多了并行度和open和close方法

在这里插入图片描述

继承SourceFunction

单一的source , run方法只会执行一次,但是这样每调用一次MySQL都会创建一次连接,非常消耗资源

package com.shujia.source

import java.sql._

import com.shujia.common.FlinkTool
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._

object Demo2MysqlSource extends FlinkTool{
  override def run(env: StreamExecutionEnvironment): Unit = {
    val mysqlDS: DataStream[(Int, String, Int, String, String, String)] = env.addSource(new MysqlSource)
    mysqlDS.print()
  }
}

class MysqlSource extends SourceFunction[(Int,String,Int,String,String,String)]{
  override def run(sourceContext: SourceFunction.SourceContext[(Int,String,Int,String,String,String)]): Unit = {
    //加载驱动
    Class.forName("com.mysql.jdbc.Driver")

    //建立连接
    val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student","root","123456")

    val statement: PreparedStatement = conn.prepareStatement("select * from student limit 10")

    val resultSet: ResultSet = statement.executeQuery()

    while (resultSet.next()){
      val id: Int = resultSet.getInt("id")
      val name: String = resultSet.getString("name")
      val age: Int = resultSet.getInt("age")
      val gender: String = resultSet.getString("gender")
      val clazz: String = resultSet.getString("clazz")
      val last_mod: String = resultSet.getString("last_mod")

      //将数据发送到下游
      sourceContext.collect((id,name,age,gender,clazz,last_mod))
    }

    conn.close()
  }

  override def cancel(): Unit = {}
}

在这里插入图片描述

继承RichSourceFunction

多了open和close方法,这样就可以在open方法中创建一次连接,在close中关闭,节省资源

class MysqlSource extends RichSourceFunction[(Int,String,Int,String,String,String)]{
  var conn: Connection = _

  override def open(parameters: Configuration): Unit = {
    //加载驱动
    Class.forName("com.mysql.jdbc.Driver")

    //建立连接
    conn = DriverManager.getConnection("jdbc:mysql://master:3306/student","root","123456")

  }

  override def run(sourceContext: SourceFunction.SourceContext[(Int,String,Int,String,String,String)]): Unit = {

    val statement: PreparedStatement = conn.prepareStatement("select * from student limit 10")

    val resultSet: ResultSet = statement.executeQuery()

    while (resultSet.next()){
      val id: Int = resultSet.getInt("id")
      val name: String = resultSet.getString("name")
      val age: Int = resultSet.getInt("age")
      val gender: String = resultSet.getString("gender")
      val clazz: String = resultSet.getString("clazz")
      val last_mod: String = resultSet.getString("last_mod")

      //将数据发送到下游
      sourceContext.collect((id,name,age,gender,clazz,last_mod))
    }
  }

  override def close(): Unit = {
    conn.close()
  }
  
  override def cancel(): Unit = {}
}

在这里插入图片描述

transform

flink的算子不是懒执行的算子,不管怎么样都会执行的,不需要行动算子也会执行

map

在这里插入图片描述
我们可以看到map函数,里面有两种方式,都是传入一个函数,只不过一个是传入Scala写的函数,一个是传入Java函数

传入Scala函数

传入Scala函数:和之前一样的操作即可

linesDS.map(
      line=>{
        line+"scala"
      }
    ).print()

传入Java函数

继承MapFunction

传入Java函数,我们可以看到这里只要继承MapFunction即可,这里面有两个参数,一个是传入的参数类型,一个是输出的参数类型
在这里插入图片描述

linesDS.map(new MapFunction[String,String] {
      override def map(line: String): String = {
        line+"java"
      }
    }).print()
继承RichMapFunction (可以做open和close)
linesDS.map(new RichMapFunction[String,String] {

      override def open(parameters: Configuration): Unit = {

      }

      override def map(line: String): String = {
        line+"java"
      }

      override def close(): Unit = {

      }
    })

效果相同
在这里插入图片描述

flatMap

传入Scala函数

val linesDS: DataStream[String] = env.socketTextStream("master",8888)
linesDS.flatMap(line=>{
  line
    .split(",")
}).print()

在这里插入图片描述
在这里插入图片描述

传入Java函数

linesDS.flatMap(new RichFlatMapFunction[String,String] {

      override def open(parameters: Configuration): Unit = {
        println("open")
      }

      override def close(): Unit = {
        println("close")
      }

      override def flatMap(in: String, collector: Collector[String]): Unit = {
        in
          .split(",")
          .foreach(word=>{
            collector.collect(word)
          })
      }

    }).print()

在这里插入图片描述

Filter

val stuDS: DataStream[String] = env.readTextFile("data/students.txt")

    stuDS.filter(new FilterFunction[String] {
      override def filter(t: String): Boolean = {
        println("filter")
        t.split(",")(3) == "男" //将性别为男的过滤出来
      }
    })print()

KeyBy

 //将相同的key发送到同一个task中
    val stuDS: DataStream[String] = env.readTextFile("data/students.txt")

    stuDS.keyBy(new KeySelector[String,String] {
      override def getKey(in: String): String = {
        in
      }
    }).print()

Reduce

用于KeyBy之后对数据进行聚合

val linesDS: DataStream[String] = env.socketTextStream("master",8888)

    val kvDS: DataStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_,1))

    val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1)

    /**
      * reduce:keyby之后对数据进行聚合
      */
    keyByDS.reduce(new ReduceFunction[(String, Int)] {
      override def reduce(t: (String, Int), t1: (String, Int)): (String, Int) = {
        (t1._1,t1._2+t._2)
      }
    }).print()

在这里插入图片描述

Agg

package com.shujia.transformation


import com.shujia.common.FlinkTool
import org.apache.flink.streaming.api.scala._


object Demo6Agg extends FlinkTool{
  override def run(env: StreamExecutionEnvironment): Unit = {
    val studentDS: DataStream[String] = env.readTextFile("data/students.txt")


    val stuDS: DataStream[Student] = studentDS.map(line => {
      val split: Array[String] = line.split(",")
      Student(split(0), split(1), split(2).toInt, split(3), split(4))
    })


    stuDS
      .keyBy(_.clazz)
      .sum("age")
    // .print()


    /**
      * max 和 maxBy 之间的区别在于 max 返回流中的最大值,但 maxBy 返回具有最大值的键,
      */
    stuDS
      .keyBy(_.clazz)
      .maxBy("age")
      .print()
  }

}
case class Student(id: String, name: String, age: Int, gender: String, clazz: String)

Window

val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    /**
      * 每个5秒统计一次单词的数量
      *
      */


    val kvDS: DataStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1))

    val countDS: DataStream[(String, Int)] = kvDS
      .keyBy(_._1)
      //5秒一个窗口
      .timeWindow(Time.seconds(5))
      .sum(1)

    countDS.print()

union

val ds1: DataStream[Int] = env.fromCollection(List(1,2,3,4,5,6))
    val ds2: DataStream[Int] = env.fromCollection(List(4,5,6,7,8,9))

    /**
      * 合并DataStream  类型要一致
      */
    val unionDS: DataStream[Int] = ds1.union(ds2)

    unionDS.print()

SideOutput

旁路输出

val studentsDS: DataStream[String] = env.readTextFile("data/students.txt")

    /**
      * 将性别为男和性别为女学生单独拿出来
      *
      */

    val nan: OutputTag[String] = OutputTag[String]("男")
    val nv: OutputTag[String] = OutputTag[String]("女")


    val processDS: DataStream[String] = studentsDS.process(new ProcessFunction[String, String] {
      override def processElement(line: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = {


        val gender: String = line.split(",")(3)

        gender match {
          //旁路输出
          case "男" => ctx.output(nan, line)
          case "女" => ctx.output(nv, line)
        }
      }
    })


    //获取旁路输出的DataStream

    val nanDS: DataStream[String] = processDS.getSideOutput(nan)
    val nvDS: DataStream[String] = processDS.getSideOutput(nv)


    nvDS.print()

sink

自定义输出

package com.shujia.sink

import com.shujia.common.FlinkTool
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala._

object Demo1Sink extends FlinkTool{
  override def run(env: StreamExecutionEnvironment): Unit = {

    val studentDS: DataStream[String] = env.readTextFile("data/students.txt")

    //    studentDS.print()

    /**
      * 自定义sink
      *
      */

    studentDS.addSink(new MySink)
  }
}

class MySink extends SinkFunction[String]{
  /**
    * invoke : 每一条数据都会执行一次
    *
    * @param line    数据
    * @param context 上下文对象
    */
  override def invoke(line: String, context: SinkFunction.Context[_]): Unit = {
    println(line)
  }
}

自定义输出到MySQL

package com.shujia.sink

import java.sql.{Connection, DriverManager, PreparedStatement}

import com.shujia.common.FlinkTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._

object Demo2MysqlSink extends FlinkTool{
  override def run(env: StreamExecutionEnvironment): Unit = {

    val studentDS: DataStream[String] = env.readTextFile("data/students.txt")

    studentDS.addSink(new MysqlSink)

  }

}

class MysqlSink extends RichSinkFunction[String] {


  var con: Connection = _

  /**
    * 在invoke 之前执行,每一个task中只只一次
    *
    */
  override def open(parameters: Configuration): Unit = {
    println("创建连接")
    //加载驱动
    Class.forName("com.mysql.jdbc.Driver")

    //1、建立链接
    con = DriverManager.getConnection("jdbc:mysql://master:3306/student?useUnicode=true&characterEncoding=utf-8", "root", "123456")

  }

  override def close(): Unit = {
    println("关闭连接")
    con.close()
  }

  /**
    * 每一条数据都会执行一次
    *
    */
  override def invoke(line: String, context: SinkFunction.Context[_]): Unit = {

    val split: Array[String] = line.split(",")


    val stat: PreparedStatement = con.prepareStatement("insert into student(id,name,age,gender,clazz) values(?,?,?,?,?)")

    stat.setString(1, split(0))
    stat.setString(2, split(1))
    stat.setInt(3, split(2).toInt)
    stat.setString(4, split(3))
    stat.setString(5, split(4))

    stat.execute()

  }
}

在这里插入图片描述

感谢阅读,我是啊帅和和,一位大数据专业大四学生,祝你快乐。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章           查看所有文章
加:2021-11-25 08:11:03  更:2021-11-25 08:13:37 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 7:50:43-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码