IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink / Scala - DataSource 之 DataStream 获取数据总结 -> 正文阅读

[大数据]Flink / Scala - DataSource 之 DataStream 获取数据总结

一.引言

DataStream API 得名于特殊的 DataStream 类,该类用于表示 Flink 程序中的数据集合。你可以认为 它们是可以包含重复项的不可变数据集合。这些数据可以是有界(有限)的,也可以是无界(无限)的,但用于处理它们的API是相同的。

DataStream 在用法上类似于常规的 Java 集合,但在某些关键方面却大不相同。它们是不可变的,这意味着一旦它们被创建,你就不能添加或删除元素。你也不能简单地察看内部元素,而只能使用 DataStream API 操作来处理它们,DataStream API 操作也叫作转换(transformation)。

你可以通过在 Flink 程序中添加 source 创建一个初始的 DataStream。然后,你可以基于 DataStream 派生新的流,并使用 map、filter 等 API 方法把 DataStream 和派生的流连接在一起。和之前相同,一个 DataStrea 的处理主要包含 Source + Transformation + Sink 的组合:

?Tips:

与之前不同的是,DataSet 的执行环境为:

    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

DataStreaming 的执行环境为:

    val env = StreamExecutionEnvironment.getExecutionEnvironment

?

二.FileBased 基于文件

这里大部分接口与 DataSet 类似,由于 env 的不同,得到的最终类型也不同,由 DataSet 变为了 DataStreaming

1.readTextFile(path)

读取文本文件,例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。

    val textLines = env.readTextFile("path")

2.readFile(fileInputFormat, path)

按照指定的文件输入格式读取(一次)文件。

    class selfFileInputFormat() extends FileInputFormat[String] {
      override def reachedEnd(): Boolean = ???

      override def nextRecord(ot: String): String = ???
    }

    val dataStream = env.readFile(new selfFileInputFormat(), "")

3.readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)

上述两个方法是基于 API 直接调用的,底层调用函数为该函数,该方法它基于给定的?fileInputFormat?读取路径?path?上的文件。根据提供的?watchType?的不同,source 可能定期(每?interval?毫秒)监控路径上的新数据。

Tips:

在底层,Flink 将文件读取过程拆分为两个子任务,即?目录监控?和?数据读取。每个子任务都由一个单独的实体实现。监控由单个非并行(并行度 = 1)任务实现,而读取由多个并行运行的任务执行。后者的并行度和作业的并行度相等。单个监控任务的作用是扫描目录(定期或仅扫描一次,取决于?watchType),找到要处理的文件,将它们划分为?分片,并将这些分片分配给下游 reader。Reader 是将实际获取数据的角色。每个分片只能被一个 reader 读取,而一个 reader 可以一个一个地读取多个分片。

FileProcessingMode.PROCESS_CONTINUOUSLY

当一个文件被修改时,它的内容会被完全重新处理。这可能会打破 “精确一次” 的语义,因为在文件末尾追加数据将导致重新处理文件的所有内容。

FileProcessingMode.PROCESS_ONCE

source 扫描一次路径然后退出,无需等待 reader 读完文件内容。当然,reader 会继续读取数据,直到所有文件内容都读完。关闭 source 会导致在那之后不再有检查点。这可能会导致节点故障后恢复速度变慢,因为作业将从最后一个检查点恢复读取。

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 自定义 TextFormat
    class selfFileInputFormat() extends FileInputFormat[String] {
      override def reachedEnd(): Boolean = ???

      override def nextRecord(ot: String): String = ???
    }
    // 隐函数 typeInfo
    implicit val typeInfo = TypeInformation.of(classOf[String])
    // 读取模式 watchType
    val watchType = FileProcessingMode.PROCESS_CONTINUOUSLY
    // 文件过滤 fileFilter 也可以加入过滤正在处理的文件逻辑
    val filePathFilter = new FilePathFilter {
      override def filterPath(filePath: Path): Boolean = {
        filePath.getPath.endsWith(".txt")
      }
    }

    val dataStream = env.readFile(new selfFileInputFormat(), "", watchType, 60L, filePathFilter)

三.Collection-Based

1.fromCollection(Collection)

从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型。当然,使用 scala 的转换后,scala 对应的 collection 也可以使用,这里使用方法和 DataSet 类似。

    val dataStream: DataStream[String] = env.fromCollection(Array("spark", "flink"))

2.fromCollection(Iterator, Class)

从迭代器获取,class 参数指定返回值返回元素的数据类型

    val dataStream: DataStream[String] = env.fromCollection(Iterator("spark", "flink"))

3.fromElements(T ...)

从给定的对象序列中创建数据流。所有的对象必须属于同一类型。

    val dataStream: DataStream[String] = env.fromElements("spark", "flink")

4.fromParellelCollection(SplittableIterator, Class)

?从迭代器并行创建数据流。class 参数指定迭代器返回元素的数据类型。

    val itSequence = new NumberSequenceIterator(0, 100)
    val dataStream = env.fromParallelCollection(itSequence)

5.generateSequence(from, to)

val numbers = env.generateSequence(1, 10000000)

四.Socket-Based

从 Socket 读取。元素可以由分隔符分隔。

1.启动 Socket

在本地 terminal 执行下列语句并输入一下字符:

nc -lk 9999

?

2.读取 Socket

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val text = env.socketTextStream("localhost", 9999)

      val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(_._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .sum(1)

    counts.print()

    env.execute("Window Stream WordCount")

?上面是使用 keyBy 对 5S 滚动窗口内的单词进行 wordCount 的实例,下面是输出结果:

3> (hello,1)
5> (world,1)

持续输入一些单词程序会每5s统计一个窗口内的 wordCount。

五.AddSource

1.官方 API

上一边文章提到了 Flink 支持的外部 API 以及对应支持的运行方式,下述 Connector 类别中支持 source 的均可以调用官方 API 和 Maven 依赖进行数据读取与加载生成 DataStream。

Connector 类别支持方式
Apache Kafkasource/sink
Apache Cassandrasink
Amazon Kinesis Streamssource/sink
Elasticsearchsink
FileSystemsink
RabbitMQsource/sink
Google PubSubsource/sink
Hybrid Source?source
Apache NiFi?source/sink
Apache Pulsarsource
Twitter Streaming APIsource
JDBCsink

2. Self-Defined

自定义数据源需要继承 RichSourceFunction[T] 并定义数据类型 T,?主要实现 run 方法 - 获取数据与 cancel 方法 - 停止获取数据,这里和 Spark-Streaming 自定义 receiver,Storm 自定义实现 spout 类似,下面例子将以1s为间隔持续从文本中读取新内容并输出:

    class SourceFromFile extends RichSourceFunction[String] {
      private var isRunning = true

      override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
        val bufferedReader = new BufferedReader(new FileReader("data.txt"))
        while ( {
          isRunning
        }) {
          val line = bufferedReader.readLine
          if (!StringUtils.isBlank(line)) {
            ctx.collect(line)
          }
          TimeUnit.SECONDS.sleep(1)
        }
      }

      override def cancel(): Unit = {
        isRunning = false
      }
    }

    val dataStream = env.addSource(new SourceFromFile()).print()

?

六.总结

结合上一篇?Flink / Scala - DataSource 之 DataSet 获取数据总结,Flink 两种数据结构的获取 - DataSet / DataStream 就都介绍完了,作为流式处理引擎,Flink 更擅长于处理 DataStream 流式数据,后续也会介绍更多的流式数据处理方法。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-08 22:34:18  更:2022-03-08 22:35:45 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 19:58:52-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码