IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark-Core编程 -> 正文阅读

[大数据]Spark-Core编程

Spark简介

Spark是加州大学伯克利分校AMP实验室开发的通用内存并行计算框架。Spark使用Scala语言进行实现,它是一种面向对象、函数式编程语言,能够像操作本地集合对象一样轻松地操作分布式数据集,具有以下特点。

  1. 运行速度快Spark拥有DAG执行引擎,支持在内存中对数据进行迭代计算。官方提供的数据表明,如果数据由磁盘读取,速度是Hadoop MapReduce的10倍以上,如果数据从内存中读取,速度可以高达100多倍。
  2. 易用性好Spark不仅支持Scala编写应用程序,而且支持JavaPython等语言进行编写,特别是Scala是一种高效、可拓展的语言,能够用简洁的代码处理较为复杂的处理工作。
  3. 通用性强Spark生态圈即BDAS(伯克利数据分析栈)包含了Spark CoreSpark SQLSpark StreamingMLLibGraphX等组件,这些组件分别处理Spark Core提供内存计算框架、SparkStreaming的实时处理应用、Spark SQL的即席查询、MLlibMLbase的机器学习和GraphX的图处理。
  4. 随处运行Spark具有很强的适应性,能够读取HDFSCassandraHBaseS3Techyon为持久层读写原生数据,能够以MesosYARN和自身携带的Standalone作为资源管理器调度job,来完成Spark应用程序的计算。

Spark结构设计

Spark运行架构包括集群资源管理器(Cluster Manager)、运行作业任务的工作节点(Worker Node)、每个应用的任务控制节点(Driver)和每个工作节点上负责具体任务的执行进程(Executor)。其中,集群资源管理器可以是Spark自带的资源管理器,也可以是YARN或Mesos等资源管理框架。

在驱动程序中,通过SparkContext主导应用的执行,SparkContext可以连接不同类型的Cluster Manager(YARN、Mesos),连接后,获得集群节点上的Executor,Spark的主入口,每个JVM只能有一个活跃的。

  • Spark 2.0+加入了SparkSession应用程序的主入口:包含了SparkContext、SQLContext、HiveContext以及StreamingContext
  • 一个Worker节点默认一个Executor,可通过SPARK_WORKER_INSTANCES调整

搭建Spark Linux集群

Spark作为一个数据处理框架和计算引擎,被设计在所有常见的集群环境中运行,在国内工作中主流的环境为 YARN。 接下来我们来学习在强大的Yarn环境下Spark是如何工作的。

先搭建Hadoop的集群环境,确保可以正常工作,在此基础上继续搭建Spark集群。

修改Hadoop配置文件yarn-site.xml

<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
<property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
</property>
<!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true-->
<property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
</property> 

Yarn application has already ended! It might have been killed or unable to launch application master.

安装Spark配置环境

# 1. 解压spark,并重命名为spark-yarn
[root@node01 conf]# tar -zxvf spark-2.0.0-bin-hadoop2.7.tgz
[root@node01 local]# mv spark-2.0.0-bin-hadoop2.7 spark-yarn

# 2. 修改spark-env.sh文件,添加java环境和yarn配置地址
[root@node01 local]# cd spark-yarn/conf/
[root@node01 conf]# mv spark-env.sh.template spark-env.sh
[root@node01 conf]# vim spark-env.sh
export JAVA_HOME=/usr/local/src/jdk
export HADOOP_HOME=/usr/local/src/hadoop
export HADOOP_CONF_DIR=/usr/local/src/hadoop/etc/hadoop
export LD_LIBRARY_PATH=/usr/local/src/hadoop/lib/native
export SPARK_MASTER_IP=master
export SPARK_LOCAL_DIRS=/usr/local/src/spark/tmp
export SPARK_LOG_DIR=/usr/local/src/spark/logs
export SPARK_MASTER_PORT=7077
export SPARK_DRIVER_MEMORY=512M
[root@node01 conf]# vim slaves 
node02
node03

# 3. 修改环境变量
[root@node01 conf]# vim ~/.bash_profile
## SPARK_ENV
export SPARK_HOME=/usr/local/spark-yarn
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
[root@node01 conf]# source ~/.bash_profile

# 4. 使用scp命令把scala, spark和profile分发给其他两个节点
[root@node01 local]# scp spark node02:$PWD

# 5. 启动HDFS以及Spark
[root@node01 conf]# start-all.sh
[root@node01 sbin]# ./start-all.sh 

# 6. 查看服务情况
[root@node01 spark-yarn]# jps
15600 NameNode
15782 SecondaryNameNode
15927 ResourceManager
16648 Jps
16569 Master

查看Web界面,默认访问端口是 8080

Spark YARN平台启动测试

在这里插入图片描述

# 在 spark-env.sh 中添加如下语句
export LD_LIBRARY_PATH=/usr/local/src/hadoop/lib/native
# 创建HDFS目录
hdfs dfs -mkdir /spark-yarn-jars
# 将本地spark的jar包上传到hdfs
hdfs dfs -put ${SPARK_HOME}/jars/*.jar /spark-yarn-jars
# 配置spark.yarn.jars
vim spark-defaults.conf
spark.yarn.jars hdfs://master:9000/spark-yarn-jars/*.jar
spark-shell --master yarn --deploy-mode client

image.png

在YARN平台上运行Spark提供的默认案例

spark-submit --master yarn  --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.0.0.jar 10

# 参数说明:
--class	    Spark程序中包含主函数的类
--master	Spark程序运行的模式(环境)

image.png

Spark RDD简介

MapReduce框架都是把中间结果写入到HDFS中,带来了大量的数据复制、磁盘IO和序列化开销。显然,如果能将结果保存在内存当中,就可以大量减少IO。RDD就是为了满足这种需求而出现的,它提供了一个抽象的数据架构,我们不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换处理,不同RDD之间的转换操作形成依赖关系,可以实现管道化,从而避免了中间结果的落地存储,大大降低了数据复制、磁盘IO和序列化开销。
RDDResilient Distributed Datasets弹性分布式数据集),是Spark中最重要的概念,可以简单的把RDD理解成一个提供了许多操作接口的数据集合,和一般数据集不同的是,其实际数据被划分为一到多个分区,所有分区数据分布存储于一批机器中(内存或磁盘中),这里的分区可以简单地和Hadoop HDFS里面的文件块来对比理解。
在这里插入图片描述
RDD主要有两大类操作,分别为转换(Transformations)和操作(Actions)。转换主要是指把原始数据集加载到RDD以及把一个RDD转换为另外一个RDD。而操作主要指把RDD存储到硬盘或触发转换执行。

RDD原理

Spark中的所有转换都是懒惰(Lazy)操作,它们只是记住了需要这样的转换操作,并不会马上执行,只有等到Actions操作的时候才会根据RDD的依赖关系生成DAG,并从起点开始真正启动计算过程进行计算。
在这里插入图片描述

先经过转换textFile把数据从HDFS加载到RDDA以及RDDC,这时其实RDDARDDC中都没有数据的,再到后面的转换flatMapmapreduceByKey等,分别把RDDA转换为RDDBRDDF以及RDDCRDDE等,这些转换都是没有执行的。它们相当于先做了个计划,但是并没有具体执行,等到执行操作saveAsSequenceFile时,才开始真正触发并执行任务。

这一系列处理称为一个“血缘关系(Lineage)",即DAG拓扑排序的结果。采用惰性调用,通过血缘关系连接起来的一系列RDD操作就可以实现管道化 (pipeline),避免了多次转换操作之间数据同步的等待,而且不用担心有过多的中间数据,因为这些具有血缘关系的操作都管道化了,一个操作得到的结果不需要保存为中间数据,而是直接管道式地流入到下一个操作进行处理。同时,这种通过血缘关系就是把一系列操作进行管道化连接的设计方式,也使得管道中每次操作的计算变得相对简单,保证了每个操作在处理逻辑上的单一性;相反,在MapReduce的设计中,为了尽可能地减少MapReduce过程,在单个MapReduce中会写入过多复杂的逻辑。

RDD Transformation & Action

常用的Transformation函数

函数描述
map(f:T=>U)RDD数据集中的每个元素都使用f,返回一个新的RDD
filter(f:T=>Bool)RDD数据集中的每个元素都使用f,返回使用ftrue的元素构成的RDD
flatMap(f:T=>seq(U))map相似,但是每个输入项都可以映射到0个或多个输出项(因此func应该返回Seq而不是单个项)
distinct([n])返回一个新的数据集,其中包含源数据集的不同元素。
groupByKey(numTasks)返回(K,Seq(V)),根据相同的键分组
sortByKey(asc)(K,V)对的数据集上调用时,返回(K,V)对的数据集,按布尔值指定,按键以升序或降序排序asc
reduceByKey(f:(V,V)=>V)用一个给定的f作用在groupByKey而产生的(K,Seq(V)),比如求和
union(otherDataset)返回一个新的dataset,包含源dataset和给定dataset的元素的集合

常用的Action函数

函数描述
reduce(f:(T,T)=>T)通过函数f聚集数据集中的所有元素,f函数接收两个参数,返回一个值
collect()返回数据集中所有的元素
count(p:Int=>Bool)返回数据集中所有元素的个数
take(n:Int)返回前n个元素
saveAsTextFile(path)将数据集的元素以textfile的形式保存到本地文件系统、hdfs或者任何其它Hadoop支持的文件系统
saveAsObjectFile(path)使用Java序列化以简单的格式编写数据集的元素,然后可以使用进行加载SparkContext.objectFile
foreach(f:T=>U)对数据集中的每个元素都执行函数f

创建RDD

RDD是一个容错的、只读的、可进行并行操作的数据结构,是一个分布在集群各个节点的存放元素的集合。RDD有3种不同的创建方法。一种是对程序种存在的基本数据结构中的集合进行并行化(如Set、List、Array),另一种是通过已有RDD转换得到新的RDD,这两种都是通过内存已有集合创建RDD。还有一种是直接读取外部存储的数据集。

val spark = SparkSession.builder().master("local").appName("RDD Transform").getOrCreate()
val rdd = spark.sparkContext.makeRDD(List(1, 2, 3, 4, 5))
// 转换函数 --> 匿名函数
def mapFn(num: Int): Int = num * 2
// 使用转换函数 转换后的RDD
val mapRDD = rdd.map(mapFn)
// 遍历每一个元素,返回转换后的值
val mapRDD = rdd.map(_*2)
mapRDD.foreach(println)

从内存中已有的数据创建RDD

parallelize方式

SparkContext类中有两个方法:parallelizemakeRDD。通过parallelizemakeRDD可将单机数据创建为分布式RDD。
parallelize(Seq seq, int numSlices)有两个参数可以输入,要转换的集合,必须是Seq集合,分区个数,默认是2。

Seq是一种序列。所谓序列,指的是一类具有一定长度的可迭代访问的对象,其中每个元素均带有一个从0开始计数的固定索引位置。

val rdd = spark.sparkContext.parallelize(Seq(1, 2, 3, 4, 5, 6), 1)
val sum = rdd.reduce(_ + _)
print(sum, rdd.partitions.size)  // (21,1)

makeRDD方式

makeRDD(seq:Seq[T], numSlices),底层通过parallelize函数把一般数据结构加载为RDD,默认分区是1

val rdd = spark.sparkContext.makeRDD(Seq(1, 2, 3, 4, 5, 6), 1)
rdd.groupBy(item => item % 2).foreach(println)

(0,CompactBuffer(2, 4, 6))
(1,CompactBuffer(1, 3, 5))

从外部存储创建RDD

从外部存储创建RDD是指直接读取一个存放在文件系统的数据文件创建RDD,通过SparkContext对象的textFile方法读取数据集,支持多种类型数据集,如目录、文本文件、压缩文件和通配符匹配的文件等。并且允许设定分区个数。

spark.sparkContext.textFile("spark_core_1\\src\\main\\resources\\1.txt")
  .flatMap(item => item.split(" "))
  .map(item => (item, 1))
  .reduceByKey(_+_)
  .foreach(println)

(Spark,1)
(Hello,2)
(Scala,1)

RDD转换和操作函数

map:将原来RDD的每个数据项通过map中的用户自定义函数f转换成一个新的RDD,map操作不会改变RDD的分区数目

// 使用map函数对RDD中每个元素进行+10操作
spark.sparkContext.parallelize(List(1, 2, 3, 4)).map(item => item + 10).foreach(println)
    
// 使用map函数产生键值对RDD
spark.sparkContext.parallelize(List("Hello", "Python", "Hello", "Hadoop", "Spark")).map(item => (item, 1)).foreach(println)

flatMap:使用flatMap对集合中的每个元素进行操作再扁平化(把二维数组拉平为一维数组)

// map 操作
spark.sparkContext.parallelize(List("Hello Spark", "Hello Scala", "Hello Spark","Spark")).map(item=>item.split(" ")).foreach(println)
[["Hello","Spark"],["Hello","Scala"],["Hello","Spark"],["Spark"]]

// flatMap 操作
spark.sparkContext.parallelize(List("Hello Spark", "Hello Scala", "Hello Spark","Spark")).flatMap(item=>item.split(" ")).foreach(println)
["Hello","Spark","Hello","Scala","Hello","Spark","Spark"]
val rdd = spark.sparkContext.makeRDD(List(List(1, 2), 3, List(4, 5)))

// 模式匹配转换
rdd.flatMap(item=>{
  item match {
    case list: List[_] => list // 列表直接返回 _表示通配符,指代任意类型的元素 List<?> list == list:List[_]
    case data: Int => List(data) // 数值封装为列表返回
  }
})

// 简写形式
rdd.flatMap{
  case list: List[_] => list // 列表直接返回
  case data: Int => List(data) // 数值封装为列表返回
}

sortBy:是对标准RDD进行排序的方法,可接受三个参数

  • 参数1是一个函数 f:(T) => K,左边是要被排序对象中的每一个元素,右边返回的值是元素中要进行排序的值。
  • 参数2是 ascending,决定排序后RDD中的元素是升序还是降序,默认是true,也就是升序。
  • 参数3是 numPartitions,该参数决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的个数相等,即为 this.partitions.size
// 根据列表中的单词字典序排序
val rdd = spark.sparkContext.parallelize(List(("Hello", 3), ("Spark", 2), ("Scala", 8), ("Java", 2)))
rdd.sortBy(item => item._1, ascending = false, 1).foreach(println)
    
(Scala,8)
(Hello,3)
(Spark,2)
(Java,2)

sortByKey:作用于 Key-Value 形式的 RDD,并对 Key 进行排序,可接受两个参数,分别是 ascendingnumPartitions

val rdd = spark.sparkContext.parallelize(List(("Hello", 3), ("Spark", 2), ("Scala", 8), ("Java", 2)))
rdd.sortByKey(ascending = true, 1).foreach(println)

(Hello,3)
(Java,2)
(Scala,8)
(Spark,2)

take:用于获取 RDD 中从 0 到 size-1 下标的元素

val rdd = spark.sparkContext.parallelize(List(("Hello", 3), ("Spark", 2), ("Scala", 8), ("Java", 2)))
rdd.take(2).foreach(println)
    
(Hello,3)
(Spark,2)

filter:对元素进行过滤,对每个元素应用给定的函数,返回值为true的元素在RDD中保留,为false的则被过滤掉

// 过滤掉data RDD中元素为奇数的元素
val rdd = spark.sparkContext.parallelize(List(1, 2, 3, 4, 5, 6, 8, 9, 0, 1, 1, 2, 3, 4, 4, 5))
rdd.filter(item => item % 2 == 0).foreach(println)

distinct**:去重,针对RDD中重复的元素,只保留一个元素

val rdd = spark.sparkContext.parallelize(List(1, 2, 3, 4, 5, 6, 8, 9, 0, 1, 1, 2, 3, 4, 4, 5))
rdd.distinct.foreach(println)

union:合并RDD,需要保证两个RDD元素类型一致

val rdd1 = spark.sparkContext.parallelize(List(1, 2, 3))
val rdd2 = spark.sparkContext.parallelize(List(3, 4, 5))
rdd1.union(rdd2).foreach(println)

groupBy:根据指定参数进行分组

val rdd = spark.sparkContext.makeRDD(List("Hello", "Spark", "Hello", "Scala"))
rdd.groupBy(item => item)
  //.map(item=>(item._1,item._2.size))
  .map{case (word, itr) => (word, itr.size)}
  .foreach(println)

(Spark,1)
(Hello,2)
(Scala,1)

collect:返回RDD中所有的元素
count():计算RDD中所有元素个数
saveAsTextFile:把RDD保存到HDFS中

键值对RDD操作

有很多种创建键值对RDD的方式,很多存储键值对的数据格式会在读取时直接返回由其键值对组成的PairRDD。
join:把两个rdd根据键值进行分组整合起来

val rdd1 = spark.sparkContext.parallelize(List(("K1","V1"),("K2","V2"),("K3","V3")))
val rdd2 = spark.sparkContext.parallelize(List(("K1","W1"),("K2","W2")))
rdd1.join(rdd2).foreach(println)  // (K1,(V1,W1))

zip:将两个RDD组合成Key/Value形式的RDD,这里要求两个RDD的partition数量以及元素数量都相同,否则会抛出异常。

val rdd1 = spark.sparkContext.makeRDD(1 to 5,2)
val rdd2 = spark.sparkContext.makeRDD(Seq("A","B","C","D","E"),2)
rdd1.zip(rdd2).foreach(println)  // (1,A)

keys/values:获取键值对的所有键/值

val kvRDD = spark.sparkContext.makeRDD(List(("Hello", 2), ("Spark", 4), ("Hello", 4), ("Scala", 11)))
kvRDD.keys.foreach(println)
kvRDD.values.foreach(println)

mapValues:是针对键值对(Key,Value)类型的数据中的Value进行Map操作,而不对Key进行处理

kvRDD.mapValues(item => item * 2).foreach(println)  // (Hello,4)

lookup:返回指定K的所有V值

val kvRDD = spark.sparkContext.makeRDD(List(("Hello", 2), ("Spark", 4), ("Hello", 4), ("Scala", 11)))
kvRDD.lookup("Spark").foreach(println)  // 4

reduceByKey:把相同key的value进行操作

val kvRDD = spark.sparkContext.makeRDD(List(("Hello", 2), ("Spark", 4), ("Hello", 4), ("Scala", 11)))
kvRDD.reduceByKey((num1, num2) => num1 + num2).foreach(println)  //(Hello,6)

keyBy:为各个元素,按指定的函数生成key,形成key-value的RDD。

val rdd = spark.sparkContext.parallelize(List("dog","tiger","lion","cat","eagle"))
rdd.keyBy(item=>item.length).foreach(println)  // (3,dog)

创建Maven项目

Spark由Scala 语言开发的,接下来的开发所使用的语言也为Scala,当前使用的Spark版本为2.4.7,默认采用的Scala编译版本为2.11.7开发前请保证IDEA开发工具中含有Scala开发插件。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

<dependencies>
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.3.4</version>
  </dependency>
</dependencies>

编写WordCount案例

在这里插入图片描述

def main(args: Array[String]): Unit = {
  // 1. 建立 Spark Application 应用
  val conf = new SparkConf().setMaster("local").setAppName("wordCount")
  val context = new SparkContext(conf)
  // 2. 读取文件获取每一行数据
  val rdd = context.textFile("spark_core_1\\src\\main\\resources\\*.txt")
  // 3. 将数据根据单词进行拆分,形成一个个单词
  val wordRDD = rdd.flatMap(item=>item.split(" "))
  // 4. 将数据根据单词进行分组,便于统计
  val groupRDD = wordRDD.groupBy(item=>item)
  // 5. 对分组的数据进行转换
  val wordCountRDD = groupRDD.map(item=>(item._1,item._2.size))
  // 6.将转换结果采集到控制台打印出来
  wordCountRDD.collect.foreach(item=>println(item))
  // 7.将转换结果合并为一个分区保存到文件中
  wordCountRDD.repartition(1).saveAsTextFile("spark_core_1\\output\\demo1")
}

Spark聚合优化

在这里插入图片描述
在这里插入图片描述

def main(args: Array[String]): Unit = {
  // 1. 建立 Spark Application 应用
  val conf = new SparkConf().setMaster("local").setAppName("wordCount")
  val context = new SparkContext(conf)
  // 2. 读取文件获取每一行数据
  val rdd = context.textFile("spark_core_1\\src\\main\\resources\\*.txt")
  // 3. 将数据根据单词进行拆分,形成一个个单词
  val wordRDD = rdd.flatMap(item => item.split(" "))
  // 4. 将单词封装为Tuple (hello,1)
  val wordTupleRDD = wordRDD.map(item => (item, 1))
  // 5. 分组聚合,Spark提供了reduceByKey根据相同的word分组聚合
  val wordCountRDD = wordTupleRDD.reduceByKey((num1, num2) => num1 + num2)
  // 6.将转换结果采集到控制台打印出来
  wordCountRDD.collect.foreach(item=>println(item))
  // 7.将转换结果合并为一个分区保存到文件中
  wordCountRDD.repartition(1).saveAsTextFile("spark_core_1\\output\\demo2")
}

使用IDEA打包案例到Spark集群运行

package com.example

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("wordCount")
    val context = new SparkContext(conf)
    context.textFile(args(0)).flatMap(line=>line.split(",")).groupBy(word=>word).map(item=>(item._1,item._2.size)).foreach(println)
  }
}

先使用 mavenclear 命令清空项目,再使用 **IDEA Build** 菜单编译项目,最后使用mavenpackage 打包,并上传到 /root/spark_data 目录备用。

# 使用本地文件运行
[root@master ~]$ spark-submit --master local --class com.example.WordCount spark_core-1.0-SNAPSHOT.jar file:///root/spark_data/words.txt
# 上传文件到hdfs的data目录上,在运行
[root@master ~]$ spark-submit --master local --class com.example.WordCount spark_core-1.0-SNAPSHOT.jar /data/words.txt
(spark,4)
(hive,2)
(hadoop,4)
(java,2)

在这里插入图片描述

文件的存储与读取

在实际开发中,所要读取的文本格式不仅仅只是普通的文本文件,还包含其他类型的文本,例如JSONSequenceFile。并且,当数据计算或处理结束后,通常需要将结果保存,用于应用或分析。
Spark支持一些常见文件格式如下:

格式名称结构化描述
文本文件普通的文本文件,每一行一条记录
JSON半结构化常见的基于文本的格式,半结构化;大多数库都要求每一行一条记录
CSV非常常见的基于文本的格式,通常在电子表格应用中使用
SequenceFile一种用于键值对数据的文件,常见Hadoop文件格式

JSON文件的读取与存储

  • JSON文件读取,将数据作为文本文件,对JSON数据进行解析。这种方法要求文件每行是一条JSON记录,如果记录跨行,就需要读取整个文件,对每个文件进行解析。
{"name":"jack","age":12}
{"name":"lili","age":22}
{"name":"cc","age":11}
{"name":"vv","age":13}
{"name":"lee","age":14}
<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>fastjson</artifactId>
  <version>1.2.73</version>
</dependency>
import com.alibaba.fastjson.JSON
import org.apache.spark.{SparkConf, SparkContext}

object SparkRDD {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("json read write")
    val sc = new SparkContext(conf)
    val jsonRDD = sc.textFile("file:///D:\\spark_data\\people.json")
      .map(JSON.parseObject)
      .map(item=>(item.getString("name").toUpperCase,item.getInteger("age")+10))
    // 保存文件,指定保存文件个数
    jsonRDD.repartition(1).saveAsTextFile("output/json")
  }
}

CSV文件的读取与存储

逗号分隔值(CSV)文件每行都有固定数据的字段,字段间用逗号隔开。在CSV的所有数据字段剧没有包含换行符的情况下,可以使用textFile读取并解析数据。如果在字段中嵌有换行符,就需要完整读入每个文件,然后解析各段。

# CSV格式
name;age;job
Jorge;30;Developer
Bob;32;Developer
def main(args: Array[String]): Unit = {
  val conf = new SparkConf().setMaster("local").setAppName("json read write")
  val sc = new SparkContext(conf)
  val csvRDD = sc.textFile("file:///D:\\spark_data\\people.csv")
  .map(line=>line.split(";").toList)
  .map(item=>item.mkString(","))
  // 保存文件,指定保存文件个数
  csvRDD.repartition(1).saveAsTextFile("/output/csv")
}

SequenceFile的读取与存储

SequenceFile是由没有相对关系结构的键值对组成的常用Hadoop格式。

  • SequenceFile有专门读取接口,可以调用SparkContextsequenceFile(path,keyClass,valueClass,minPartitions)实现。SequenceFile使用的是HadoopWritable类型,所以keyClassvalueClass参数必须定义为正确的Writable类。
  • SequenceFile文件的存储首先需要保证有一个键值对类型的RDD,然后直接调用saveSequenceFile(path)保存数据,自动将键和值转化为Writable类型。
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>2.7.3</version>
</dependency>
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.io.{IntWritable, Text}
object SparkRDD {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("json read write")
    val sc = new SparkContext(conf)
    // 保存
    //val rdd = sc.makeRDD(List(("Java", 88), ("Python", 100), ("Spark", 99), ("Scala", 98)))
    //rdd.saveAsSequenceFile("output/sequence")
    
    // 读取
    sc.sequenceFile("output/sequence",classOf[Text],classOf[IntWritable])
      .map{case (x,y)=>(x.toString,y.get)}
      .foreach(println)
  }
}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-15 00:05:37  更:2022-04-15 00:12:11 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 12:40:42-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码