IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> DataFrame -> 正文阅读

[大数据]DataFrame

为什么推出Spark SQL?

  • ·关系数据库已经很流行
  • ·关系数据库在大数据时代已经不能满足要求
    • 首先,用户需要从不同数据源执行各种操作,包括结构化、半结构化和非结构化数据
    • ?其次,用户需要执行高级分析,比如机器学习和图像处理
  • ·在实际大数据应用中,经常需要融合关系查询和复杂分析算法(比如机器学习或图像处理),但是,缺少这样的系统

而Spark SQL填补了这个鸿沟:

  • 首先,可以提供DataFrame API,可以对内部和外部各种数据源执行各种关系型操作
  • 其次,可以支持大数据中的大量数据源和数据分析算法
  • Spark SQL可以融合:传统关系数据库的结构化数据管理能力和机器学习算法的数据处理能力

sparkSql和 spark on hive的区别:(24条消息) Spark、Hive、Hbase比较_好啊啊啊啊的博客-CSDN博客


?DataFrame概述:

  • DataFrame的推出,让Spark具备了处理大规模结构化数据的能力,不仅比原 有的RDD转化方式更加简单易用,而且获得了更高的计算性能。
  • Spark能够轻松实现从MySQL到DataFrame的转化,并且支持SQL查询。

基本概念:

  • DataFrame是一种以RDD为基础的分布式数据集,提供了详细的结构信息
  • SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并且支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。

创建SparkSession对象:

可以通过如下语句创建一个SparkSession对象:

from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()

创建和保存DataFrame:

创建DataFrame时,可以使用spark.read操作,从不同类型的文件中加载数据创建DataFrame,例如:

spark.read.text("people.txt"):#读取文本文件people.txt创建DataFrame(同理可以有json/parquet)
或者也可以使用如下格式的语句:
?spark.read.format("text").load("people.txt"):#读取文本文件people.json创建DataFrame;
?spark.read.format("json").load("people.json"):#读取JSON文件
?spark.read.format("parquet").load("people.parquet"):#读取Parquet文件people.parquet创建
DataFrame

可以使用spark.write操作,把一个DataFrame保存成不同格式的文件,例如,把一个名称为df的DataFrame保存到不同格式文件中,方法如下:

df.write.text("people.txt")
df.write.json("people.json“)
#或者也可以使用如下格式的语句:
df.write.format("text").save("people.txt")
df.write.format("json").save("people.json")
df.write.format ("parquet").save("people.parque")

示例:

下面从示例文件people.json中创建一个DataFrame,名称为peopleDF,把peopleDF保存到另外一个JSON文件中,然后,再从peopleDF中选取一个列(即name列),把该列数据保存到一个文本文件中:

peopleDF = spark.read.format("json").\
load("file:///usr/local/spark/examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("json").\
save("file:///usr/local/spark/mycode/sparksql/newpeople.json")

tips:前面提到的所有的df都是一个DataFrame?


DataFrame的常用操作:

df=spark.read.json(“people.json”)
1.printSchema()
?? ?df.printSchema()
2.select()
?? ?df.select(df["name"],df["age"]+1).show()
3.filter()
?? ?df.filter(df["age">20]).show()
4.groupBy()
?? ?df.groupBy("age").count().show()
5.sort()
?? ?df.sort(df["age"].desc()).show()
?? ?df.sort(df["age"].desc(),df["name"].asc()).show()

一个完整示例:

from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
from pyspark.sql.types import StringType
from pyspark import Row

spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()

# people = spark.sparkContext.textFile("file:///home/mls/abc/people.txt").map(lambda x:x.split(",")).\
# map(lambda x:Row(name=x[0],age=int(x[1])))
# schemePeople = spark.createaDataFrame(people)
# schemePeople.show()
# schemePeople.createOrReplaceTempView("people")
# personDF = spark.sql("select * from people where age > 20")
# personDF.show()
# personRDD = personDF.rdd.map(lambda p:"Name:"+p.name+", Age:"+str(p.age))
# personRDD.foreach(print)

#利用反射机制推断RDD模式
liter = spark.sparkContext.textFile("file:///home/mls/abc/test.txt").map(lambda x:x.split("\t")).\
map(lambda x:Row(year=x[0],timeSpan=x[1],name=x[2],gender=x[3],country=x[4],area=x[5],language=x[6],reason=x[7],type=x[8]))
schemeLiter = spark.createDataFrame(liter)
#必须注册为临时表才能供下面的查询使用 注意不要用全局的Global
schemeLiter.createOrReplaceTempView("liter")
literDF = spark.sql("select * from liter")
literDF.show()
#DataFrame中的每个元素都是一行记录,包含name和country两个字段,分别用p.name和p.age来获取值
literRDD = literDF.rdd.map(lambda p:"Name:"+p.name+", country:"+str(p.country))
literRDD.foreach(print)

#利用反射机制推断RDD模式
#生成表头
schemeString = "year timeSpan name gender country area language reason type"
fields = [StructField(field_name,StringType(),True) for field_name in schemeString.split(" ")]
schema = StructType(fields)
# 生成表中的记录
lines = spark.sparkContext.textFile("file:///home/mls/abc/test.txt").map(lambda x:x.split("\t"))
liters = lines.map(lambda x:Row(x[0],x[1],x[2],x[3],x[4],x[5],x[6],x[7],x[8]))
# 把表头和表中的记录拼装在一起
litersDF = spark.createDataFrame(liters,schema)
litersDF.show()

效果:(上面有部分没截图)?

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-23 15:49:12  更:2021-12-23 15:49:37 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 7:56:07-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码