IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flink批处理一 -> 正文阅读

[大数据]flink批处理一

Flink批处理开发

Flink 应用程序结构主要包含三部分,Source/Transformation/Sink,如下图所示:
Source: 数据源,Flink 在流处理和批处理上的 source 大概有 4 类:
基于本地集合的 source
基于文件的 source
基于网络套接字的 source
自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter
Streaming API、Apache NiFi 等,当然你也可以定义自己的 source。
Transformation:数据转换的各种操作,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window
/ WindowAll / Union / Window join / Split / Select / Project 等,操作很多,可以将数据转换计算成你想要的数据。
Sink:接收器,Flink 将转换计算后的数据发送的地点 ,你可能需要存储下来,Flink 常见的 Sink 大概有如下几
类:
写入文件、
打印输出、
写入 socket 、
自定义的 sink 。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache
Cassandra、Hadoop FileSystem 等,同理你也可以定义自己的 Sink。

Flink批处理DataSource

Flink 做为一款流式计算框架,它可用来做批处理,即处理静态的数据集、历史的数据集;也可以用来做流处理,即
实时的处理些实时数据流,实时的产生数据流结果,只要数据源源不断的过来,Flink 就能够一直计算下去,这个
Data Sources 就是数据的来源地。
Flink在批处理中常见的source主要有两大类。
基于本地集合的source(Collection-based-source)
基于文件的source(File-based-source)
基于本地集合的Source
在Flink中最常见的创建本地集合的DataSet方式有三种。
1.使用env.fromElements(),这种方式也支持Tuple,自定义对象等复合形式。
2.使用env.fromCollection(),这种方式支持多种Collection的具体类型。
3.使用env.generateSequence(),这种方法创建基于Sequence的DataSet。

读取本地文件
package com.ccj.pxj.heima
import org.apache.flink.api.scala._
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
object BatchDemo {
  def main(args: Array[String]): Unit = {
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    val ds0: DataSet[String] = env.fromElements("spark", "flink")
    ds0.print()
    val ds1: DataSet[(Int, String)] = env.fromElements((1, "spark"), (2, "flink"))
    ds1.print()
    val ds2: DataSet[String] = env.fromCollection(Array("spark", "flink"))
    ds2.print()
    val ds3: DataSet[String] = env.fromCollection(ArrayBuffer("spark", "flink"))
    ds3.print()
    val ds4: DataSet[String] = env.fromCollection(List("spark", "flink"))
    ds4.print()
    val ds5 = env.fromCollection(ArrayBuffer("spark", "flink"))
    ds5.print()
    var ds6=env.fromCollection(List("spark", "flink"))
    var ds7=env.fromCollection(ListBuffer("spark", "flink"))
    var ds8=env.fromCollection(Vector("spark", "flink"))
    var ds9=env.fromCollection(mutable.Queue("spark", "flink"))
    var ds10=env.fromCollection(mutable.Stack("spark", "flink"))
    var ds11=env.fromCollection(Stream("spark", "flink"))
    var ds12=env.fromCollection(Seq("spark", "flink"))
    var ds13=env.fromCollection(Set("spark", "flink"))
    var ds14=env.fromCollection(Iterable("spark", "flink"))
    val ds15=env.fromCollection(mutable.ArraySeq("spark", "flink"))
    val ds16=env.fromCollection(mutable.ArrayStack("spark", "flink"))
    val ds17=env.fromCollection(Map(1 -> "spark", 2 -> "flink"))
    val ds18=env.fromCollection(Range(1, 9))
    val ds19=env.generateSequence(1, 9)
    ds6.print()
    ds7.print()
    ds8.print()
    ds9.print()
    ds10.print()
    ds11.print()
    ds12.print()
    ds13.print()
    ds14.print()
    ds15.print()
    ds16.print()
    ds17.print()
    ds18.print()
    ds19.print()
  }
}

读取HDFS文件数据

package com.ccj.pxj.heima
import org.apache.flink.api.scala._
object BatchHDFFromFile {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    env.readTextFile("hdfs://pxj60:9000//data/hive/mulit_file/1.txt").print()
  }
}
读取CSV文件数据
package com.ccj.pxj.heima
import org.apache.flink.api.scala._
object BatchFromCsvFile {
  def main(args: Array[String]): Unit = {
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    val dast: DataSet[Score] = env.readCsvFile[Score]("./data/score.csv")
    dast.print()
  }
}
case class Score(id:Int,name:String,subjectId:Int,score:Double)
读取压缩文件
package com.ccj.pxj.heima
import org.apache.flink.api.scala._
object BatchFromCompressFile {
  def main(args: Array[String]): Unit = {
    val env: ExecutionEnvironment= ExecutionEnvironment.getExecutionEnvironment
    val ee: DataSet[String] = env.readTextFile("E:\\cyrus-sasl-2.1.27.tar.gz")
    ee.print()
  }
}
遍历目录
package com.ccj.pxj.heima
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
object BatchFromFolder{
  def main(args: Array[String]): Unit = {
     //初始化环境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     // recursive.file.enumeration 开启递归
    val conf: Configuration = new Configuration
    conf.setBoolean("recursive.file.enumeration",true)
    env.readTextFile("G:\\apache-maven-3.6.3").withParameters(conf).print()
  }
}

作者:pxj
日期:2021-07-24 14:07:36

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-25 11:45:16  更:2021-07-25 11:46:41 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年4日历 -2024/4/30 18:42:56-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码