IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> python、scala语言的spark学习 -> 正文阅读

[大数据]python、scala语言的spark学习

安装基础环境

(1)scala的spark环境

1.idea新建scala项目

? ? ? idea 首先安装scala插件,然后如下链接新建scala项目。IEDA编写Scala代码_大数据面壁者的博客-CSDN博客

2.pom引入spark??Downloads | Apache Spark

<properties>
        <spark.version>3.2.1</spark.version>
        <scala.version>2.12</scala.version>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>

            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.19</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>

        </plugins>
    </build>

3.注意scala-sdk版本

?? ?如上?artifactId: spark-core_2.12 需要scala版本为2.12。scala不需要下载,idea可以下载指定版本的scala。File--Project Structure--Libraries--+ download一个版本即可。

(2)python的spark环境?

  1. 安装 Anaconda?Anaconda的安装教程_小橙子喜欢吃果冻的博客-CSDN博客
  2. 安装 Jupyter
pip install jupyter

? ? ?3.安装 PySpark 使用阿里云的源安装,如果安装报错 Could not fetch URL .... ,可以尝试其他源。

pip install pyspark -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com

? ? 4.启动jupyter

? ? ? 需要注意文件夹要事先创建好,这个就是你的笔记本文件夹。启动后,浏览器会弹出,可以在控制台里面找到Jupyter notebook的链接

jupyter notebook --ip=0.0.0.0 --notebook-dir='E:\\JupyterWorkspace'

? ?5. 运行测试代码
为了测试安装的结果,我们新建一个笔记本,在单元格中写入如下代码:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
?
##初始化
spark = SparkSession.builder.master("local[*]").appName("Test").getOrCreate()
## 0 + 1 + 2 + 3 + 4?
spark.range(0, 5).select(col("id").cast("double")).agg({'id': 'sum'}).show()
## 关闭
spark.stop()

? ? ? 代码的作用是 Spark 对元素为 0~4 的数组进行求和处理,运行代码,Jupyter notebook 会展示运行结果。如下图所示:

? ? ?这里特别说明的是,代码中的参数 local[*] 指明了 Spark 基于本地操作系统运行,如果基于 YARN、Mesos 或者 Kubernetes,只需要对应修改该参数即可。

编写代码

(1)scala语言

1.新建 SparkSession 对象

val spark = SparkSession
.builder()
.master("yarn-client")
.appName("New SS")
.config("spark.executor.instances", "10")
.config("spark.executor.memory", "10g")
.getOrCreate()

2.编写spark代码

package com.scala.demo

import org.apache.spark.sql.SparkSession

object hello {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("New SS")
      .config("spark.executor.instances", "10")
      .getOrCreate()
    val rdd = spark.sparkContext.parallelize(Seq(1, 2, 3))
    println(rdd.count())
  }
}

(2)python语言?

1.新建 SparkSession 对象

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName("New SS").config("spark.executor.instances", "10").config("spark.executor.memory", "10g").getOrCreate()

2.编写spark代码?

Spark知识

(1)RDD分类

? ? ?RDD 是 Spark 最核心的数据结构,RDD(Resilient Distributed Dataset)全称为弹性分布式数据集,是 Spark 对数据的核心抽象,也是最关键的抽象,它实质上是一组分布式的 JVM 不可变对象集合,不可变决定了它是只读的,所以 RDD 在经过变换产生新的 RDD 时,原有 RDD 不会改变。

? ? 弹性主要表现在两个方面:?

  • 在面对出错情况(例如任意一台节点宕机)时,Spark 能通过 RDD 之间的依赖关系恢复任意出错的 RDD(如 B 和 D 可以算出最后的 RDD),RDD 就像一块海绵一样,无论怎么挤压,都像海绵一样完整;
  • 在经过转换算子处理时,RDD 中的分区数以及分区所在的位置随时都有可能改变。

? ? 我们可以将 RDD 的类型分为以下几类:

  • 并行集合;
  • 从 HDFS 中读取;
  • 从外部数据源读取;
  • PairRDD。

? ? 了解了 RDD 的类型,接下来我们逐个讲解它们的内容:

并行化集合

? ? 这种 RDD 纯粹是为了学习,将内存中的集合变量转换为 RDD,没太大实际意义。

//val spark: SparkSession = .......
val rdd = spark.sparkcontext.parallelize(Seq(1, 2, 3))

从 HDFS 中读取

? ? 这种生成 RDD 的方式是非常常用的,

//val spark: SparkSession = .......
val rdd = spark.sparkcontext.textFile("hdfs://namenode:8020/user/me/wiki.txt")

从外部数据源读取

? ? Spark 从 MySQL 中读取数据返回的 RDD 类型是 JdbcRDD,顾名思义,是基于 JDBC 读取数据的,这点与 Sqoop 是相似的,但不同的是 JdbcRDD 必须手动指定数据的上下界,也就是以 MySQL 表某一列的最值作为切分分区的依据。

//val spark: SparkSession = .......
val lowerBound = 1
val upperBound = 1000
val numPartition = 10
val rdd = new JdbcRDD(spark.sparkcontext,() => {
Class.forName("com.mysql.jdbc.Driver").newInstance()
DriverManager.getConnection("jdbc:mysql://localhost:3306/db", "root", "123456")
},
"SELECT content FROM mysqltable WHERE ID >= ? AND ID <= ?",
lowerBound,
upperBound,
numPartition,
r => r.getString(1)
)

? ? ?上面需要在pom文件引入mysql

     <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.35</version>
        </dependency>

? ? 既然是基于 JDBC 进行读取,那么所有支持 JDBC 的数据库都可以通过这种方式进行读取,也包括支持 JDBC 的分布式数据库,但是你需要注意的是,从代码可以看出,这种方式的原理是利用多个 Executor 同时查询互不交叉的数据范围,从而达到并行抽取的目的。但是这种方式的抽取性能受限于 MySQL 的并发读性能,单纯提高 Executor 的数量到某一阈值后,再提升对性能影响不大。

? ? 上面介绍的是通过 JDBC 读取数据库的方式,对于 HBase 这种分布式数据库来说,情况有些不同,HBase 这种分布式数据库,在数据存储时也采用了分区的思想,HBase 的分区名为 Region,那么基于 Region 进行导入这种方式的性能就会比上面那种方式快很多,是真正的并行导入。

//val spark: SparkSession = .......
val sc = spark.sparkcontext
val tablename = "your_hbasetable"
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3")
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set(TableInputFormat.INPUT_TABLE, tablename)
val rdd= sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
// 利用HBase API解析出行键与列值
rdd_three.foreach{case (_,result) => {
val rowkey = Bytes.toString(result.getRow)
val value1 = Bytes.toString(result.getValue("cf".getBytes,"c1".getBytes))
}

? ? 值得一提的是 HBase 有一个第三方组件叫 Phoenix,可以让 HBase 支持 SQL 和 JDBC,在这个组件的配合下,第一种方式也可以用来抽取 HBase 的数据,此外,Spark 也可以读取 HBase 的底层文件 HFile,从而直接绕过 HBase 读取数据。说这么多,无非是想告诉你,读取数据的方法有很多,可以根据自己的需求进行选择。

? ? 通过第三方库的支持,Spark 几乎能够读取所有的数据源,例如 Elasticsearch,所以你如果要尝试的话,尽量选用 Maven 来管理依赖。

PairRDD

? ? PairRDD 与其他 RDD 并无不同,只不过它的数据类型是 Tuple2[K,V],表示键值对,因此这种 RDD 也被称为 PairRDD,泛型为 RDD[(K,V)],而普通 RDD 的数据类型为 Int、String 等。这种数据结构决定了 PairRDD 可以使用某些基于键的算子,如分组、汇总等。PairRDD 可以由普通 RDD 转换得到:

//val spark: SparkSession = .......
val a = spark.sparkcontext.textFile("/user/me/wiki").map(x => (x,x))

(2)常见算子例子(scala语言)?

flatMap

在spark中map函数和flatMap函数是两个比较常用的函数。其中

  • map:对集合中每个元素进行操作。
  • flatMap:对集合中每个元素进行操作然后再扁平化。

理解扁平化可以举个简单例子

val arr=sc.parallelize(Array(("A",1),("B",2),("C",3)))
arr.flatmap(x=>(x._1+x._2)).foreach(println)


输出结果为

A
1
B
2
C
3

如果用map

val arr=sc.parallelize(Array(("A",1),("B",2),("C",3)))
arr.map(x=>(x._1+x._2)).foreach(println)

输出结果

A1
B2
C3

所以flatMap扁平话意思大概就是先用了一次map之后对全部数据再一次map。

reduceByKey

reduceByKey(function)
? ? reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行function的reduce操作(如前所述),因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。

reduceByKey(_+_)是reduceByKey((x,y) => x+y)的一个 简洁的形式

 val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("New SS")
      .config("spark.executor.instances", "10")
      .getOrCreate()
    var rdd = spark.sparkContext.parallelize(List("a","b","a"))
    
    var rdd2 = rdd.map((_,1))
    rdd2.foreach(println(_))
    
    rdd2.reduceByKey((x,y)=>x+y).foreach(println(_))

输出结果

(b,1)
(a,2)

groupByKey?

对Key-Value形式的RDD的操作,例如:

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)
val b = a.keyBy(_.length)//给value加上key,key为对应string的长度
b.groupByKey.collect
//结果 Array((4,ArrayBuffer(lion)), (6,ArrayBuffer(spider)), (3,ArrayBuffer(dog, cat)), (5,ArrayBuffer(tiger, eagle)))

groupByKey输出为 key value对 ,后续可以通过map对每个分组进行处理

groupByKey和reduceByKey区别

reduceByKey与groupByKey功能一样,只是实现不一样。本函数会先在每个分区聚合然后再进行总的统计,如图:

?而groupByKey则是

?因此,本函数比groupByKey节省了传播的开销,尽量少用groupByKey

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-09-18 10:14:40  更:2021-09-18 10:15:46 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 13:18:52-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码