IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark 结构化API-DataFram、SparkSQL -> 正文阅读

[大数据]Spark 结构化API-DataFram、SparkSQL

前言:本文主要介绍Spark中结构化API的使用。

一、数据源

Read API的结构:

DataFrameReader.format(文件类型).option(属性,值).schema(自定义的模式).load(文件路径)

format、schema、一系列option选项,每一步转换都会返回一个DataFrameReader。

例如:

spark.read.format("csv")
            .option("headr",true)
            .option("mode",FAILFAST)
            .option("inferSchema",true)
            .load("D:/data/spark-data.csv")

读取模式:通过 option("mode",值)设置

  • permissive:当遇到错误格式的记录时,将所有字段设置为null并将所有错误格式的记录放在名为_corrupt_record字符串列中
  • dropMalformed:删除包含错误格式记录的行
  • failFast:遇到错误格式记录后立即返回失败

Write API:

DataFrameWriter.format(文件格式)
                .option(属性,值)
                .partitionBy(字段)
                .bucketBy(字段)
                .sortBy(字段)
                .save(路径)

1.CSV:

读文件

spark.read.format("csv")
            .option("headr",true)
            .option("mode",FAILFAST)
            .option("inferSchema",true)
            .load("D:/data/spark-data.csv")

写文件

df.write.repartition(1)
        .format("csv")
        .mode("overwrite")
        .option("sep","\t")
        .save("D:/spark/data/spark-data.csv")

2.JSON

读文件

spark.read.format("json")
          .option("header",true")
          .option("inferSchmea",true)
          .load("D:/spark/data/spark-data.json")

写文件

df.write.repartition(1)
        .format("json")
        .mode("overwrite")
        .option("sep","\t")
        .save("D:/spark/data/spark-data.json")

数据的读取格式基本都一样。

二、API操作

本文使用的数据为

航班数据

DEST_COUNTRY_NAMEORIGIN_COUNTRY_NAMEcount
United States? ? ? ? ? ? ??Romania15
..................
United StatesCroatia

1

1.取列:col()? ? column()$"列名"? ?'列名

?2.selectExpr函数

可以利用selectExpr来构建复杂的表达式来创建DataFrame

df.selectExpr("*","(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry").show(2)

?3.添加列:withColumn(列名,列)

df.withColumn("numberOne",lit(1)).show(2)

4.重命名列 withColumnRename(原列名,新列名)

df.withColumnRenamed("DEST_COUNTRY_NAME","DEST").show(2)

?

5. 删除列:drop(列名)

df.drop("DEST_COUNTRY_NAME").show(2)

?

?6.过滤行:filter、where

df.filter(col("count")<2).show(2)
df.where("count<2").show(2)

?7.去重 distinct()

//scala
df.select("DEST_COUNTRY_NAME","ORIGIN_COUNTRY_NAME").distinct().count()

//sql
select count(distinct(DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME)) from dfTable

?8.连接连个DataFrame,要求具有相同的Schema

df.union(newDF)

9.行排序 sort和orderBy 默认是升序

df.sort("count").show(3)
df.orderBy("count","DEST_COUNTRY_NAME").show(3)

import org.apache.spark.sql.functions.{desc,asc}

df.sort(desc("count")).show(2)
df.orderBy(desc("count")).show(2)

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-01-29 23:09:14  更:2022-01-29 23:09:37 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 1:32:30-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码