IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark-SparkStreaming相关内容 -> 正文阅读

[大数据]Spark-SparkStreaming相关内容

Spark-Streaming

数据处理的方式:

流式(Streaming)数据处理,来一条处理一条

批量(batch)数据处理,一次处理一批

数据处理延迟的长短:

实时数据处理:毫秒级别

离线数据处理:小时or天级别

Spark-core和Spark-SQL都是离线数据处理,Spark-Streaming是准实时(秒,分钟),微批次(时间)的数据处理框架。

概述

Spark Streaming 用于流式数据的处理。 Spark Streaming 支持的数据输入源很多,例如: Kafka、Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语(就是方法,比如Spark的算子等)如:map、reduce、join、 window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。

Spark 流使得构建可扩展的容错流应用程序变得更加容易。

在这里插入图片描述

和 Spark 基于 RDD 的概念很相似,Spark Streaming 使用离散化流(discretized stream)作为抽象表示,叫作 DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”)。所以简单来将,DStream 就是对 RDD 在实时数据处理场景的一种封装。

特点:易用、容错、易整合到 Spark 体系

整体架构图

SparkStreaming 架构图

在这里插入图片描述

一般这种实时处理的框架都是7*24小时长期运行的。
在这里插入图片描述

背压机制

Spark 1.5 以前版本,用户如果要限制 Receiver 的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer 数据生产高于 maxRate,当前集群处理能力也高于 maxRate,这就会造成资源利用率下降等问题。

为了更好的协调数据接收速率与资源处理能力,1.5 版本开始 Spark Streaming 可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即 Spark Streaming Backpressure): 根据JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收率。

通过属性“spark.streaming.backpressure.enabled”来控制是否启用 backpressure 机制,默认值
false,即不启用。

Dstream入门

WordCount 案例实操

需求:使用 netcat 工具向 9999 端口不断的发送数据,通过 SparkStreaming 读取端口数据并统计不同单词出现的次数

增加以下依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>3.0.0</version>
</dependency>
object TestWordCount {

    def main(args: Array[String]): Unit = {
        // 创建环境对象
        // StreamingContext创建时,需要传递两个参数
        // 第一个参数表示环境配置
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
        // 第二个参数表示批量处理的周期(采集周期)
        val ssc = new StreamingContext(sparkConf, Seconds(3))

        // 获取端口数据
        val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

        val wordToCount: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map(_ -> 1).reduceByKey(_ + _)

        wordToCount.print()


        // 由于SparkStream采集器是长期执行的任务,所以不能直接关闭
        // 如果main方法执行完毕,应用程序也会自动结束,所以不能让main执行完毕
        // ssc.stop()

        // 1、启动采集器
        ssc.start()
        // 2、等待采集器的关闭
        ssc.awaitTermination()
    }
}

// 结果如下
-------------------------------------------
Time: 1628663454000 ms
-------------------------------------------

-------------------------------------------
Time: 1628663457000 ms
-------------------------------------------
(word,1)
(hello,1)

-------------------------------------------
Time: 1628663460000 ms
-------------------------------------------
(a,8)
(aa,1)

-------------------------------------------
Time: 1628663463000 ms
-------------------------------------------
(a,1)

// 发送数据如下:
lxj@lxj:~$ nc -lp 9999
hello word
a
a
a
a
a
a
a
a
aa
a

WordCount 解析

Discretized Stream 是 Spark Streaming 的基础抽象,代表持续性的数据流和经过各种 Spark 原语操作后的结果数据流。在内部实现上,DStream 是一系列连续的 RDD 来表示。每个 RDD 含有一段时间间隔内的数据。

在这里插入图片描述

对数据的操作也是按照 RDD 为单位来进行的

在这里插入图片描述

计算过程由 Spark Engine 来完成

在这里插入图片描述

DStream的创建

RDD 队列

**测试过程中,**可以通过使用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到这个队列中的 RDD,都会作为一个 DStream 处理。

案例实操

需求:循环创建几个 RDD,将 RDD 放入队列。通过 SparkStream 创建 Dstream,计算WordCount

object TestWordCountQueue {

    def main(args: Array[String]): Unit = {
        // 创建环境对象
        // StreamingContext创建时,需要传递两个参数
        // 第一个参数表示环境配置
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
        // 第二个参数表示批量处理的周期(采集周期)
        val ssc = new StreamingContext(sparkConf, Seconds(3))

        // 创建 RDD 队列
        val rddQueue = new mutable.Queue[RDD[Int]]()

        // 创建 QueueInputDStream
        val inputStream = ssc.queueStream(rddQueue, oneAtATime = false)

        // 处理队列中的 RDD 数据
        val mappedStream = inputStream.map((_,1))
        val reducedStream = mappedStream.reduceByKey(_ + _)

        // 打印结果
        reducedStream.print()


        // 由于SparkStream采集器是长期执行的任务,所以不能直接关闭
        // 如果main方法执行完毕,应用程序也会自动结束,所以不能让main执行完毕
        // ssc.stop()

        // 1、启动采集器
        ssc.start()

        // 循环创建并向 RDD 队列中放入 RDD
        for (i <- 1 to 5) {
            rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
            Thread.sleep(2000)
        }

        // 2、等待采集器的关闭
        ssc.awaitTermination()
    }
}

// 结果如下
-------------------------------------------
Time: 1628664120000 ms
-------------------------------------------
(96,1)
(150,1)
(180,1)
(156,1)
(216,1)
(66,1)
(54,1)
(138,1)
(222,1)
(30,1)
...

自定义数据源

需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。

案例实操

需求:自定义数据源,实现监控某个端口号,获取该端口号内容。

object TestDIY {

    def main(args: Array[String]): Unit = {

        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
        val ssc = new StreamingContext(sparkConf, Seconds(3))

        val messageDS: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver)
        messageDS.print()

        ssc.start()
        ssc.awaitTermination()
    }

    /**
     * 自定义数据采集器
     * 1、继承Receiver,定义泛型,传递参数
     * 2、重写方法
     */
    class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) {

        private var flg = true;

        override def onStart(): Unit = {
            new Thread(() => {
                while (flg) {
                    val message = "采集数据为:" + new Random().nextInt(10).toString

                    store(message)

                    Thread.sleep(500)
                }
            }).start()
        }

        override def onStop(): Unit = {
            flg = false
        }
    }
}

// 结果如下
-------------------------------------------
Time: 1628664585000 ms
-------------------------------------------
采集数据为:4
采集数据为:2
采集数据为:4
采集数据为:1

Kafka 数据源

SparkStreaming主要就是和kafka进行数据的对接的。

版本选型

ReceiverAPI:需要一个专门的 Executor 去接收数据,然后发送给其他的 Executor 做计算。存在的问题,接收数据的 Executor 和计算的 Executor 速度会有所不同,特别在接收数据的 Executor速度大于计算的 Executor 速度,会导致计算数据的节点内存溢出。早期版本中提供此方式,当前版本不适用

DirectAPI:是由计算的 Executor 来主动消费 Kafka 的数据,速度由自身控制。

Kafka 0-10 Direct 模式

需求:通过 SparkStreaming 从 Kafka 读取数据,并将读取过来的数据做简单计算,最终打印到控制台。

依赖如下:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
    <version>3.0.0</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    <version>2.10.1</version>
</dependency>
object DirectAPI {
    def main(args: Array[String]): Unit = {
        //1.创建 SparkConf
        val sparkConf: SparkConf = new SparkConf().setAppName("ReceiverWordCount").setMaster("local[*]")
        
        //2.创建 StreamingContext
        val ssc = new StreamingContext(sparkConf, Seconds(3))
        
        //3.定义 Kafka 参数
        val kafkaPara: Map[String, Object] = Map[String, Object](
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux1:9092,linux2:9092,linux3:9092",
            ConsumerConfig.GROUP_ID_CONFIG -> "atguigu",
            "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
            "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
        )
        
        //4.读取 Kafka 数据创建 DStream
        val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
            ssc, 
            LocationStrategies.PreferConsistent, 
            ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara))
        
        //5.将每条消息的 KV 取出
        val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
        
        //6.计算 WordCount
        valueDStream.flatMap(_.split(" "))
            .map((_, 1))
            .reduceByKey(_ + _)
            .print()
        
        //7.开启任务
        ssc.start()
        ssc.awaitTermination()
    }
}

// 查看 Kafka 消费进度
bin/kafka-consumer-groups.sh --describe --bootstrap-server linux1:9092 --group bd

DStream 转换

DStream 上的操作与 RDD 的类似,分为 Transformations(转换)和 Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种 Window 相关的原语。

有无状态的区别就是,是否需要保存周期内的数据,之前的案例都是没有保存数据的,一个执行周期数据不会保存到下个执行周期使用,因此它是无状态的。

无状态转化操作

无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD。部分无状态转化操作列在了下表中。注意,针对键值对的 DStream 转化操作(比如reduceByKey())要添加 import StreamingContext._才能在 Scala 中使用。

在这里插入图片描述

需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个 DStream 在内部是由许多 RDD(批次)组成,且无状态转化操作是分别应用到每个 RDD 上的。例如:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。

Transform

Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream的 API 中暴露出来,通过该函数可以方便的扩展 Spark API。该函数每一批次调度一次。其实也就是对 DStream 中的 RDD 应用转换。

val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
// transform方法可以将底层RDD获取到,获取到之后可以对其进行操作
// 1、DStream功能不完善
// 2、需要代码周期性的执行
// 考虑使用transform
// 代码执行位置:Driver(执行一次)
lines.transform(
    // 代码执行位置:Driver(周期性执行,一个采集周期执行一次)
    rdd => {
        rdd.map(
            str => {
                // 代码执行位置:Executor
                str
            }
        )
    }
)

// 代码执行位置:Driver
lines.map(
    str => {
        // 代码执行位置:Executor
        str
    }
)

join

两个流之间的 join 需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的 RDD 进行 join,与两个 RDD 的 join 效果相同。底层就是RDD的join

val data9999: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val data8888: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 8888)

val map9999: DStream[(String, Int)] = data9999.map((_, 9))
val map8888: DStream[(String, Int)] = data8888.map((_, 8))

// 所谓的DStream的join操作,其实就是两个RDD的join
map9999.join(map8888).print()

// 结果如下
-------------------------------------------
Time: 1628667370000 ms
-------------------------------------------
(b,(9,8))
(b,(9,8))
(b,(9,8))
(b,(9,8))
(b,(9,8))
(b,(9,8))
(b,(9,8))
(b,(9,8))
(b,(9,8))

lxj@lxj:~$ nc -lk 9999
b
b
b

lxj@lxj:~$ nc -lk 8888
b
b
b

有状态转化操作

UpdateStateByKey

UpdateStateByKey 原语用于记录历史记录,有时,我们需要在 DStream 中跨批次维护状态(例如流计算中累加 wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。

updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对应的(键,状态)对组成的。

updateStateByKey 操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,需要做下面两步:

1、定义状态,状态可以是一个任意的数据类型。

2、定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更
新。

使用 updateStateByKey 需要对检查点目录进行配置,会使用检查点来保存状态。

更新版的 wordcount

object TestUpdateStateByKey {

    def main(args: Array[String]): Unit = {

        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
        val ssc = new StreamingContext(sparkConf, Seconds(3))

        val datas: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

        val wordToCount: DStream[(String, Int)] = datas.flatMap(_.split(" ")).map((_, 1))

        // 无状态数据操作,只对当前的采集周期内的数据进行处理
        // 在某些场合下,需要保存数据统计结果(状态),实现数据的汇总
        // wordToCount.reduceByKey(_ + _).print()

        // 使用有状态操作时,需要设定检查点路径
        ssc.checkpoint("cp")

        // updateStateByKey:根据key对数据的状态进行更新
        // 传递的参数中含有两个值
        // 第一个值表示相同的可以的value数据
        // 第二个值表示缓存区相同key的value数据
        wordToCount.updateStateByKey(
            (seq: Seq[Int], buff: Option[Int]) => {
                Option(buff.getOrElse(0) + seq.sum)
            }
        ).print()

        ssc.start()
        ssc.awaitTermination()
    }
}

// 结果如下
-------------------------------------------
Time: 1628666622000 ms
-------------------------------------------
(word,2)
(hello,3)
(spark,1)

-------------------------------------------
Time: 1628666625000 ms
-------------------------------------------
(word,3)
(hello,4)
(spark,1)

lxj@lxj:~$ nc -lk 9999
hello word
hello spark
hello word
hello word

WindowOperations

Window Operations 可以设置窗口的大小和滑动窗口的间隔来动态的获取当前 Steaming 的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。就是把多个采集周期当做一个整体,一次处理多个采集周期的数据。每次滑动步长的距离。

窗口时长:计算内容的时间范围;

滑动步长:隔多久触发一次计算。

注意:这两者都必须为采集周期大小的整数倍。

val line: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

val wordToOne: DStream[(String, Int)] = line.map((_, 1))

// 窗口的范围应该是采集周期的整数倍
val windowDS: DStream[(String, Int)] = wordToOne.window(Seconds(9))

windowDS.reduceByKey(_ + _).print()

// 结果如下
-------------------------------------------
Time: 1628668251000 ms
-------------------------------------------
(a,1)

-------------------------------------------
Time: 1628668254000 ms
-------------------------------------------
(a,1)
(b,1)
(c,1)

-------------------------------------------
Time: 1628668257000 ms
-------------------------------------------
(a,3)
(b,1)
(c,1)

-------------------------------------------
Time: 1628668260000 ms
-------------------------------------------
(a,2)
(b,1)
(c,1)

-------------------------------------------
Time: 1628668263000 ms
-------------------------------------------
(a,2)

lxj@lxj:~$ nc -lk 9999
a
b
c
a
a

关于 Window 的操作还有如下方法:

(1)window(windowLength, slideInterval): 基于对源 DStream 窗化的批次进行计算返回一个新的 Dstream;

(2)countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;

(3)reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流;

(4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 当在一个(K,V)对的 DStream 上调用此函数,会返回一个新(K,V)对的 DStream,此处通过对滑动窗口中批次数据使用 reduce 函数来整合每个 key 的 value 值。

(5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 这个函数是上述函数的变化版本,每个窗口的 reduce 值都是通过用前一个窗的 reduce 值来递增计算。通过 reduce 进入到滑动窗口数据并”反向 reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对 keys 的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于”可逆的 reduce 函数”,也就是这些 reduce 函数有相应的”反 reduce”函数(以参数 invFunc 形式传入)。如前述函数,reduce 任务的数量通过可选参数来配置。

val line: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

val wordToOne: DStream[(String, Int)] = line.map((_, 1))

ssc.checkpoint("cp")

// reduceByKeyAndWindow需要设置检查点
// reduceByKeyAndWindow:当窗口范围比较大,但是滑动幅度比较小,那么可以采用增加数据和删除数据的方式
// 无需重复计算,目的是为了提升性能
val windowDS: DStream[(String, Int)] = wordToOne.reduceByKeyAndWindow(
    (x, y) => x + y,
    (x, y) => x - y,
    Seconds(9)
)

windowDS.print()

// 结果
-------------------------------------------
Time: 1628669016000 ms
-------------------------------------------
(a,1)
(b,1)

-------------------------------------------
Time: 1628669019000 ms
-------------------------------------------
(a,2)
(b,1)
(c,1)

-------------------------------------------
Time: 1628669022000 ms
-------------------------------------------
(a,3)
(b,1)
(c,1)

-------------------------------------------
Time: 1628669025000 ms
-------------------------------------------
(a,2)
(b,0)
(c,1)

-------------------------------------------
Time: 1628669028000 ms
-------------------------------------------
(a,1)
(b,0)
(c,0)

-------------------------------------------
Time: 1628669031000 ms
-------------------------------------------
(a,0)
(b,0)
(c,0)

lxj@lxj:~$ nc -lk 9999
a
b
a
c
a

DStream 输出

输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。与 RDD 中的惰性求值类似,如果一个 DStream 及其派生出的 DStream 都没有被执行输出操作,那么这些 DStream 就都不会被求值。如果 StreamingContext 中没有设定输出操作,整个 context 就都不会启动。

如果没有输出操作,那么会提示错误。

输出操作如下:

print():在运行流程序的驱动结点上打印 DStream 中每一批次数据的最开始 10 个元素。这用于开发和调试。在 Python API 中,同样的操作叫 print()。

saveAsTextFiles(prefix, [suffix]):以 text 文件形式存储这个 DStream 的内容。每一批次的存储文件名基于参数中的 prefix 和 suffix。”prefixTime_IN_MS[.suffix]”。

saveAsObjectFiles(prefix, [suffix]):以 Java 对象序列化的方式将 Stream 中的数据保存为SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]". Python中目前不可用。

saveAsHadoopFiles(prefix, [suffix]):将 Stream 中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。Python API 中目前不可用。

foreachRDD(func):这是最通用的输出操作,即将函数 func 用于产生于 stream 的每一个RDD。其中参数传入的函数 func 应该实现将每一个 RDD 中数据推送到外部系统,如将RDD 存入文件或者通过网络将其写入数据库。foreachRDD是不会有时间戳的。

通用的输出操作 foreachRDD(),它用来对 DStream 中的 RDD 运行任意计算。这和 transform()有些类似,都可以让我们访问任意 RDD。在 foreachRDD()中,可以重用我们在 Spark 中实现的所有行动操作。比如,常见的用例之一是把数据写到诸如 MySQL 的外部数据库中。

注意:

  1. 连接不能写在 driver 层面(序列化)

  2. 如果写在 foreach 则每个 RDD 中的每一条数据都创建,得不偿失;

  3. 增加 foreachPartition,在分区创建(获取)。

优雅关闭

流式任务需要 7*24 小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所有配置优雅的关闭就显得至关重要了。使用外部文件系统来控制内部程序关闭。

所谓优雅关闭,就是计算节点不再接收新的数据,而是将现有的数据处理完毕之后,然后关闭。

如果需要关闭采集器,那么需要创建新的线程去处理,并且需要第三方组件中保存是否需要关闭采集器的状态,然后线程去轮询它。

new Thread(() => {
    // 优雅的关闭
    // 计算节点不再接受新的数据,而是将现有的数据处理完毕,然后关闭
    // mysql、redis、zk、hdfs等
    while (true) {
        // 判断是否需要关闭
        if (true) {
            // 获取sparkStreaming状态
            if (ssc.getState() == StreamingContextState.ACTIVE) {
                ssc.stop(true, true)
                System.exit(0)
            }
        }

        Thread.sleep(10000)
    }
})

重新启动后恢复上次保存的数据。

val ssc: StreamingContext = StreamingContext.getActiveOrCreate("cp", () => {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    val line: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

    val wordToOne: DStream[(String, Int)] = line.flatMap(_.split(" ")).map((_, 1))

    wordToOne.updateStateByKey(
        (seq: Seq[Int], buff: Option[Int]) => {
            Option(buff.getOrElse(0) + seq.sum)
        }
    ).print()

    ssc
})


ssc.checkpoint("cp")

ssc.start()

ssc.awaitTermination()
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-12 16:40:01  更:2021-08-12 16:42:23 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 10:18:25-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码