IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> pyspark 使用广播变量进行大表和小表join -> 正文阅读

[大数据]pyspark 使用广播变量进行大表和小表join

pyspark 大表和小表join,使用广播变量,并对广播变量更新

from pyspark import SparkConf, SparkContext, SQLContext

# import org.apache.spark.sql.functions.broadcast
# 给定节点,根据其信息进行扩展,找到各个层级扩展的节点。
# df_edges_all:全部的(边)[source,target,xxx,xx],loop_nodes:[nodes],loop_num :循环次数, sqlContext: SqlContext的实例
def extend_fun(sc,sqlContext, df_edges_all,loop_nodes, loop_num):
    # loop_nodes = ""
    # 进行注册
    df_edges_all.createTempView("nodes_edges_all")
    loop_nodes.createTempView("loop_nodes")
    #广播变量

    # 进行扩展
    for i in range(loop_num):
    # 广播变量更新
        print(i)
        bd = set()
        brodcast = sc.broadcast("")
        for index, col in loop_nodes.toPandas().iterrows():
            bd.add(col["nodes"])
        brodcast.unpersist(False)
        brodcast = sc.broadcast(bd)
        df_edges_all_filter = df_edges_all.filter(
            df_edges_all["Source"].isin(brodcast.value) | df_edges_all["Target"].isin(brodcast.value))
        df_edges_all_filter.registerTempTable("loop_edges")
        # df_edges_all_filter.show(10)
        if i < loop_num-1:
            # print(i)
            loop_nodes = sqlContext.sql("""
                                        select distinct nodes from 
                                        (select Source nodes from loop_edges where Source is not null and Source<>''
                                        union 
                                        select Target nodes from loop_edges t  where Target is not null and Target<>'')
                                        """)
            # print(loop_nodes.count())
            # loop_nodes.registerTempTable("")

        sqlContext.sql("select count(1) from loop_edges").show(10)
    return df_edges_all_filter

if __name__ == '__main__':
    conf = SparkConf().setMaster('local[2]').set("spark.executor.memory", "3g")
    sc = SparkContext.getOrCreate(conf)
    sqlContext = SQLContext(sc)
    # 获取所有的边
    df_edges_all = sqlContext.read.option("header", "true").option("inferSchema", "true").csv("E:/AAA-2/过程/(边)1.csv")
    # df_edges_all.createTempView("nodes_edges_all")

    # 获取初始的成员
    df_nodes = sqlContext.read.option("header", "true").option("inferSchema", "true").csv("E:/AAA-2/aaa.csv")
    # df_nodes.show(10)
    # loop_num = sc.accumulator(3)
    df_edges_all_filter = extend_fun(sc,sqlContext,df_edges_all,df_nodes,5)
    df_edges_all_filter.show(10)
    sc.stop()
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-19 12:07:10  更:2021-08-19 12:08:21 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 12:59:54-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码