IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> spark[2]: 关于partition的相关操作(帮助理解RDD) -> 正文阅读

[大数据]spark[2]: 关于partition的相关操作(帮助理解RDD)

准备工作

# PySpark is the Spark API for Python. we use PySpark to initialize the spark context. 
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

# Creating a spark context class
# sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark IBM Cloud Example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

创建RDD

nums = [i for i in range(10)]

rdd = sc.parallelize(nums)
    
print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(rdd.glom().collect()))

使用getNumPartitions()查看分区数量,使用glom()查看分区内容。

output:

在这里插入图片描述
在函数parallelize()指定分区数量

  1. 分区为2
rdd = sc.parallelize(nums, 2)
    
print("Default parallelism: {}".format(sc.defaultParallelism))
print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(rdd.glom().collect()))

output:
在这里插入图片描述

  1. 分区为15
rdd = sc.parallelize(nums, 15)

print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(rdd.glom().collect()))

output:

在这里插入图片描述

使用partitionBy()函数

其中,数据集需要是具有键/值对的元组,因为默认分区器使用键的哈希值将元素分配给分区。


rdd = sc.parallelize(nums) \
        .map(lambda el: (el, el)) \
        .partitionBy(2) \
        .persist()
    
print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(rdd.glom().collect()))

j=0
for i in rdd.glom().collect():
    j+=1
    print("partition: " + str(j) + " "+ str(i))

output:

在这里插入图片描述

显式分区

准备数据:

transactions = [
    {'name': 'Bob', 'amount': 100, 'country': 'United Kingdom'},
    {'name': 'James', 'amount': 15, 'country': 'United Kingdom'},
    {'name': 'Marek', 'amount': 51, 'country': 'Poland'},
    {'name': 'Johannes', 'amount': 200, 'country': 'Germany'},
    {'name': 'Thomas', 'amount': 30, 'country': 'Germany'},
    {'name': 'Paul', 'amount': 75, 'country': 'Poland'},
    {'name': 'Pierre', 'amount': 120, 'country': 'France'},
    {'name': 'Frank', 'amount': 180, 'country': 'France'}
]

为了使得每一个国家在一个node中,我们使用自定义分区:

# Dummy implementation assuring that data for each country is in one partition
def country_partitioner(country):
    return hash(country)% (10**7+1)
    #return portable_hash(country)

rdd = sc.parallelize(transactions) \
        .map(lambda el: (el['country'], el)) \
        .partitionBy(3, country_partitioner)
    
print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(rdd.glom().collect()))

print("\n--\n")
for i, j in enumerate(rdd.glom().collect()):
    print("\npartition: " + str(i+1) + "\n"+ str(j))

output:

在这里插入图片描述
利用partition做计算

使用mapPartitions()函数可以单独对每个partition进行map运算。

def sum_sales(iterator):
    yield sum(transaction[1]['amount'] for transaction in iterator)

by_country = sc.parallelize(transactions) \
        .map(lambda el: (el['country'], el)) \
        .partitionBy(5, country_partitioner)
    
# Sum sales in each partition
sum_amounts = by_country \
    .mapPartitions(sum_sales) \
    .collect()

print("Total sales for each partition: {}".format(sum_amounts))

output:

每个国家对应的amounts总数

在这里插入图片描述


使用dataframe查看partition效果

df = spark.createDataFrame(transactions)

for i, j in enumerate(df.rdd.glom().collect()):
    print("partition: " + str(i+1) + "\n"+ str(j))

output:
在这里插入图片描述
可知数据分布在两个partition。

使用repartition函数

可以直接指定列名:

# Repartition by column
df2 = df.repartition(10,"country")

print("\nAfter 'repartition()'")
print("Number of partitions: {}".format(df2.rdd.getNumPartitions()))
print("Partitioner: {}".format(df2.rdd.partitioner))
print("Partitions structure: {}".format(df2.rdd.glom().collect()))

outoput:

在这里插入图片描述
查看每个partition中的数据:

for i, j in enumerate(df2.rdd.glom().collect()):
    print("partition: " + str(i+1) + "\n"+ str(j))

在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-01-12 00:04:27  更:2022-01-12 00:05:17 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 14:49:06-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码