IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> SparkStreaming-----第一个wordcount算子,Driver HA -> 正文阅读

[大数据]SparkStreaming-----第一个wordcount算子,Driver HA

1.sparkStreaming
流式处理框架,是Spark API的扩展,RDD最终封装到DStream中

2.第一个wordcount

pom依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>3.0.0</version>
    <scope>provided</scope>
</dependency>

import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 1.local[2] 一个线程接收数据,一个线程处理数据
 * 2.Durations.second(10):每10秒将接收来的数据做一次处理
 * 3.SparkStreaming启动之后,7*24小时不间断运行,不能添加新的逻辑代码
 * 4.所有逻辑代码完成后必须有 output operation类算子
 * */

object SparkStreaming01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("streaming01")
    conf.setMaster("local[2]")  //一个线程接收数据,一个线程处理数据
    val sc: SparkContext = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Durations.seconds(10)) //设置每10秒执行一次处理数据
    val lines = ssc.socketTextStream("hadoop102", 999)  //设置了虚拟机和端口号
    //统计所有单词出现次数
    val words=lines.flatMap(line=>line.split(" "))
    val pairWords=words.map(word=>new Tuple2(word,1))
    val result= pairWords.reduceByKey((v1,v2)=>{v1 + v2})
    //output operation类算子
    result.print()
    ssc.start()  //启动sparkStreaming
    ssc.awaitTermination()
  }
}

数据来源
在这里插入图片描述

3.foreachRDD算子

1.foreachRDD可以获取DStream中的RDD,可以对RDD使用RDD的算子操作,但是一定要使用RDD的action算子触发执行

 result.foreachRDD((rdd: RDD[(String, Int)]) => {
      val rdd1: RDD[String] = rdd.map(tp => {
        println("======="+tp)
        tp._1 + "=" + tp._2
      })
      rdd1.count()
    })

4.transform
transformation类算子,对Dstream做RDD到RDD的任意操作

5.updateStateByKey
transformation类算子,对每一个key的状态进行更新

6.Driver HA

第一:提交任务层面,在提交任务的时候加上选项 --supervise,当Driver挂掉的时候会自动重启Driver。
第二:代码层面,使用JavaStreamingContext.getOrCreate(checkpoint路径,JavaStreamingContextFactory)

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-02 16:50:07  更:2021-12-02 16:51:48 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 13:49:40-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码