IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Data.Analysis.with.Python.and.PySpark:PySpark的第一个程序 -> 正文阅读

[大数据]Data.Analysis.with.Python.and.PySpark:PySpark的第一个程序

命令行输入:pyspark

设置pyspark 启动时所使用的python版本参考:Data.Analysis.with.Python.and.PySpark:准备_lm19770429的专栏-CSDN博客The book focuses on Spark version 3.2How PySpark worksUnder the hood, it looks more like what’s on the right: you have some workbenches that some workers are assigned to. The workbenches are like the computers in our Spark cluster: there is a fixed ahttps://blog.csdn.net/lm19770429/article/details/123357275?spm=1001.2014.3001.5501

?The SparkSession entry point

通过使用getOrCreate()方法,您的程序将在交互式和批处理模式下工作,避免创建新的SparkSession(如果已经存在)。

?配置日志级别:

spark.sparkContext.setLogLevel("KEYWORD")

Keyword Signification
OFF No logging at all (not recommended).
FATAL Only fatal errors. A fatal error will crash your Spark cluster.
ERROR Will show FATAL, as well as other recoverable errors.
WARN Add warnings (and there are quite a lot of them).
INFO Will give you runtime information, such as repartitioning and data recovery

DEBUG Will provide debug information on your jobs.
TRACE Will trace your jobs (more verbose debug logs). Can be quite informative but very annoying.
ALL Everything that PySpark can spit, it will spit. As useful as OFF.

Reading data into a data frame with spark.read

?

PySpark reads your data
PySpark can accommodate the different ways you can process data. Under the hood,
spark.read.csv() will map to spark.read.format('csv').load(), and you may
encounter this form in the wild. I usually prefer using the direct csv method as it provides a handy reminder of the different parameters the reader can take.
orc and parquet are also data formats that are especially well suited for big data
processing. ORC (which stands for “optimized row columnar”) and Parquet are competing data formats that pretty much serve the same purpose. Both are open sourced
and now part of the Apache project, just like Spark.
PySpark defaults to using Parquet when reading and writing files, and we’ll use this
format to store our results throughout the book. I’ll provide a longer discussion about
the usage, advantages, and trade-offs of using Parquet or ORC as a data format in
chapter 6

读取txt:

>>> book=spark.read.text("./data/gutenberg_books/1342-0.txt")
>>> book
DataFrame[value: string]

您可能希望更清楚地显示模式。PySpark提供printSchema()以树形形式显示模式。

当我不记得要应用的确切方法时,我非常喜欢在对象上使用dir()

dir(spark)
['Builder', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_activeSession', '_convert_from_pandas', '_createFromLocal', '_createFromRDD', '_create_dataframe', '_create_from_pandas_with_arrow', '_create_shell_session', '_get_numpy_record_dtype', '_inferSchema', '_inferSchemaFromList', '_instantiatedSession', '_jsc', '_jsparkSession', '_jvm', '_jwrapped', '_repr_html_', '_sc', '_wrapped', 'builder', 'catalog', 'conf', 'createDataFrame', 'getActiveSession', 'newSession', 'range', 'read', 'readStream', 'sparkContext', 'sql', 'stop', 'streams', 'table', 'udf', 'version']

如果您不确定函数、类或方法的正确用法,可以打印_doc__;属性,或者,对于使用IPython的用户,可以使用后面的问号(如果需要更多详细信息,可以使用两个问号)。

 print(spark.__doc__)

从结构到内容:用show()探索我们的数据框架

默认情况下,它将显示20行并截断长值。

 book.show()

show()方法有三个可选参数:
? n可以设置为任何正整数,并将显示该行数。
? truncate如果设置为true,将截断列以仅显示20个字符。如果设置为False,它将显示整个长度,或任何正整数以截断为特定数量的字符。
? vertical接受一个布尔值,当设置为True时,将把每条记录显示为一个小表。如果您需要详细检查记录,这是一个非常有用的选项。

book.show(10, truncate=50)

可选主题:非惰性Spark

也就是说,有些时候,尤其是在学习时,您希望在每次转换后对数据帧进行评估(我们称之为急切评估)。

如果要在shell中使用即时求值,可以在shell中粘贴以下代码:

from pyspark.sql import SparkSession
spark = (SparkSession.builder
.config("spark.sql.repl.eagerEval.enabled", "True")
.getOrCreate())

简单的列转换:从一个句子移动到一个单词列表

from pyspark.sql.functions import split
lines = book.select(split(book.value, " ").alias("line"))
lines.show(5)

Selecting specific columns using select()

最基本的转变是特征,在那里你可以准确地返回提供给你的东西。如果您过去使用过SQL,您可能会认为这听起来像一个SELECT语句

book.select(book.value)

PySpark为其数据帧中的每一列提供了一个指向该列的点符号。

PySpark提供了多种选择列的方法。我将在下一个列表中显示四种最常见的方法。

from pyspark.sql.functions import col
book.select(book.value)
book.select(book["value"])
book.select(col("value"))
book.select("value")

?Transforming columns: Splitting a string into a list of words

?split函数将字符串列转换为数组列,其中包含一个或多个字符串元素。

重命名列:alias和WithColumnRename

当使用指定要显示哪些列的方法时,如select()方法,请使用alias()。

如果只想重命名列而不更改数据框的其余部分,请使用WithColumnRename。重新命名。请注意,如果该列不存在,PySpark会将此方法视为无操作,不会执行任何操作。

# This looks a lot cleaner
lines = book.select(split(book.value, " ").alias("line"))
# This is messier, and you have to remember the name PySpark assigns
automatically
lines = book.select(split(book.value, " "))
lines = lines.withColumnRenamed("split(value, , -1)", "line")

重塑数据:将列表分解成行

from pyspark.sql.functions import explode, col
words = lines.select(explode(col("line")).alias("word"))
words.show(15)
# +----------+
# | word|
# +----------+
# | The|
# | Project|
# | Gutenberg|
# | EBook|
# | of|
# | Pride|
# | and|
# |Prejudice,|
# | by|
# | Jane|
# | Austen|
# | |
# | This|
# | eBook|
# | is|
# +----------+
# only showing top 15 rows

Working with words: Changing case and removing punctuation

Lower the case of the words in the data frame

from pyspark.sql.functions import lower
words_lower = words.select(lower(col("word")).alias("word_lower"))
words_lower.show()

clean our words of any punctuation and other non-useful characters

我们只保留使用正则表达式的字母

Using regexp_extract to keep what looks like a word

from pyspark.sql.functions import regexp_extract
words_clean = words_lower.select(
regexp_extract(col("word_lower"), "[a-z]+", 0).alias("word")
)

words_clean.show()

?If you are interested in building your own, the RegExr (https://regexr.com/) website
is really useful, as well as the Regular Expression Cookbook by Steven Levithan and
Jan Goyvaerts (O’Reilly, 2012).

Filtering rows

PySpark provides not one, but two identical methods to perform this task. You can use either .filter() or its alias .where().

Filtering rows in your data frame using where or filter:?using “not equal,” or !=

words_nonull = words_clean.filter(col("word") != "")
words_nonull.show()

?

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-10 22:36:15  更:2022-03-10 22:38:44 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 20:14:00-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码