IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark GraphX学习与理解 -> 正文阅读

[大数据]Spark GraphX学习与理解

Spark GraphX学习与理解

Spark GraphX 简介

Spark GraphX是一个分布式图处理框架,它是基于Spark平台提供对图计算和图挖掘简洁易用的而丰富的接口,极大的方便了对分布式图处理的需求。
图是由顶点集合(vertex)及顶点间的关系集合(边edge)组成的一种数据结构。图可以对事物以及事物之间的关系建模,图可以用来表示自然发生的连接数据,如:社交网络、互联网web页面。
常用的应用有:在地图应用中找到最短路径、基于与他人的相似度图,推荐产品、服务、人际关系或媒体。

图的分布式或者并行处理其实是把图拆分成很多的子图,然后分别对这些子图进行计算,计算的时候可以分别迭代进行分阶段的计算,即对图进行并行计算。

  • 顶点和边
    一般关系图中,事物为顶点,关系为边
    在这里插入图片描述
  • 有向图和无向图

在有向图中,一条边的两个顶点一般扮演者不同的角色,比如父子关系、页面A连接向页面B。
在一个无向图中,边没有方向,即关系都是对等的,比如qq中的好友。
GraphX中有一个重要概念,所有的边都有一个方向,那么图就是有向图,如果忽略边的方向,就是无向图。

  • 度表示一个顶点的所有边的数量
  • 出边是指从当前顶点指向其他顶点的边
  • 入边表示其他顶点指向当前顶点的边
  • 出度是一个顶点出边的数量
  • 入度是一个顶点入边的数量

GraphX 中的pregel函数

函数概述

Pregel是Google提出的用于大规模分布式图计算框架,常用来解决一下问题:

  • 图遍历(BFS)
  • 单源最短路径(SSSP)
  • PageRank计算(上一篇已经介绍过)

Pregel的计算由一系列迭代组成,称为supersteps。Pregel迭代过程(实现过程)如下:

  • 每个顶点从上一个superstep接收入站消息
  • 计算顶点新的属性值
  • 在下一个superstep中向相邻的顶点发送消息
  • 当没有剩余消息时,迭代结束

源码

  def pregel[A: ClassTag](
      initialMsg: A,
      maxIterations: Int = Int.MaxValue,
      activeDirection: EdgeDirection = EdgeDirection.Either)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED] = {
    Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
  }

参数说明
VD顶点的数据类型
ED边的数据类型
APregel message的类型
graph输入的图
initialMsg图初始化的时候,开始模型计算的时候,所有节点都会先收到一个消息
maxIterations最大迭代次数
activeDirection规定了发送消息的方向(默认是出边方向:EdgeDirection.Out)
vprog节点接收该消息并将聚合后的数据和本节点进行属性的合并
sendMsg激活态的节点调用该方法发送消息
mergeMsg如果一个节点接收到多条消息,先用mergeMsg 来将多条消息聚合成为一条消息,如果节点只收到一条消息,则不调用该函数

案例:求5L到各点的最短路径

顶点 的状态有两种:

  • 钝化态【类似于休眠,不做任何事】
  • 激活态【干活】

顶点 能够处于激活态需要满足以下任意条件:

  • 成功收到消息
  • 成功发送了任何一条消息

代码实现:

object Test{
  def main(args: Array[String]): Unit = {
    //1、创建SparkContext
    val sparkConf = new SparkConf().setAppName("GraphxHelloWorld").setMaster("local[*]")
    val sparkContext = new SparkContext(sparkConf)

    //2、创建顶点
    val vertexArray = Array(
      (1L, ("Alice", 28)),
      (2L, ("Bob", 27)),
      (3L, ("Charlie", 65)),
      (4L, ("David", 42)),
      (5L, ("Ed", 55)),
      (6L, ("Fran", 50))
    )
    val vertexRDD: RDD[(VertexId, (String,Int))] = sparkContext.makeRDD(vertexArray)

    //3、创建边,边的属性代表 相邻两个顶点之间的距离
    val edgeArray = Array(
      Edge(2L, 1L, 7),
      Edge(2L, 4L, 2),
      Edge(3L, 2L, 4),
      Edge(3L, 6L, 3),
      Edge(4L, 1L, 1),
      Edge(2L, 5L, 2),
      Edge(5L, 3L, 8),
      Edge(5L, 6L, 3)
    )
    val edgeRDD: RDD[Edge[Int]] = sparkContext.makeRDD(edgeArray)


    //4、创建图(使用aply方式创建)
    val graph1 = Graph(vertexRDD, edgeRDD)

    /* ************************** 使用pregle算法计算 ,顶点5 到 各个顶点的最短距离 ************************** */
    //被计算的图中 起始顶点id
    val srcVertexId = 5L
    val initialGraph = graph1.mapVertices{case (vid,(name,age)) => if(vid==srcVertexId) 0.0 else Double.PositiveInfinity}

    //5、调用pregel
    val pregelGraph = initialGraph.pregel(
      Double.PositiveInfinity,
      Int.MaxValue,
      EdgeDirection.Out
    )(
      (vid: VertexId, vd: Double, distMsg: Double) => {
        val minDist = math.min(vd, distMsg)
        println(s"顶点${vid},属性${vd},收到消息${distMsg},合并后的属性${minDist}")
        minDist
      },
      (edgeTriplet: EdgeTriplet[Double,PartitionID]) => {
        if (edgeTriplet.srcAttr + edgeTriplet.attr < edgeTriplet.dstAttr) {
          println(s"顶点${edgeTriplet.srcId} 给 顶点${edgeTriplet.dstId} 发送消息 ${edgeTriplet.srcAttr + edgeTriplet.attr}成功")
          Iterator[(VertexId, Double)]((edgeTriplet.dstId, edgeTriplet.srcAttr + edgeTriplet.attr))
        } else {
          println(s"顶点${edgeTriplet.srcId} 给 顶点${edgeTriplet.dstId} 发送消息 ${edgeTriplet.srcAttr + edgeTriplet.attr}失败")
          Iterator.empty
        }
      },
      (msg1: Double, msg2: Double) => math.min(msg1, msg2)
    )

    //6、输出结果
//      pregelGraph.triplets.collect().foreach(println)
//      println(pregelGraph.vertices.collect.mkString("\n"))

    //7、关闭SparkContext
    sparkContext.stop()
  }
}

输出结果:

//------------------------------------------ 各个顶点接受初始消息initialMsg ------------------------------------------
顶点5,属性0.0,收到消息Infinity,合并后的属性0.0
顶点2,属性Infinity,收到消息Infinity,合并后的属性Infinity
顶点3,属性Infinity,收到消息Infinity,合并后的属性Infinity
顶点1,属性Infinity,收到消息Infinity,合并后的属性Infinity
顶点6,属性Infinity,收到消息Infinity,合并后的属性Infinity
顶点4,属性Infinity,收到消息Infinity,合并后的属性Infinity
//------------------------------------------ 第一次迭代 ------------------------------------------
顶点3 给 顶点6 发送消息 Infinity失败
顶点5 给 顶点6 发送消息 3.0成功
顶点2 给 顶点4 发送消息 Infinity失败
顶点4 给 顶点1 发送消息 Infinity失败
顶点5 给 顶点3 发送消息 8.0成功
顶点2 给 顶点1 发送消息 Infinity失败
顶点2 给 顶点5 发送消息 Infinity失败
顶点3 给 顶点2 发送消息 Infinity失败

顶点3,属性Infinity,收到消息8.0,合并后的属性8.0
顶点6,属性Infinity,收到消息3.0,合并后的属性3.0
//------------------------------------------ 第二次迭代 ------------------------------------------
顶点3 给 顶点2 发送消息 12.0成功
顶点3 给 顶点6 发送消息 11.0失败

顶点2,属性Infinity,收到消息12.0,合并后的属性12.0
//------------------------------------------ 第三次迭代 ------------------------------------------
顶点2 给 顶点1 发送消息 19.0成功
顶点2 给 顶点4 发送消息 14.0成功
顶点2 给 顶点5 发送消息 14.0失败

顶点4,属性Infinity,收到消息14.0,合并后的属性14.0
顶点1,属性Infinity,收到消息19.0,合并后的属性19.0
//------------------------------------------ 第四次迭代 ------------------------------------------
顶点4 给 顶点1 发送消息 15.0成功

顶点1,属性19.0,收到消息15.0,合并后的属性15.0
//------------------------------------------ 第五次迭代不用发送消息 ------------------------------------------

过程分析:
调用pregel方法之前,先把图的各个顶点的属性初始化为如下图所示:顶点5到自己的距离为0,所以设为0,其他顶点都设为 正无穷大Double.PositiveInfinity。

1. 当调用pregel方法开始:

首先,所有顶点都将接收到一条初始消息initialMsg,使所有顶点都处于激活态(红色标识的节点)。
在这里插入图片描述

2. 第一次迭代开始:

所有顶点以EdgeDirection.Out的边方向调用sendMsg方法发送消息给目标顶点,如果 源顶点的属性+边的属性<目标顶点的属性,则发送消息。否则不发送。
发送成功的只有两条边:
5—>3(0+8<Double.Infinity , 成功),
5—>6(0+3<Double.Infinity , 成功)
3—>2(Double.Infinity+4>Double.Infinity , 失败)
3—>6(Double.Infinity+3>Double.Infinity , 失败)
2—>1(Double.Infinity+7>Double.Infinity , 失败)
2—>4(Double.Infinity+2>Double.Infinity , 失败)
2—>5(Double.Infinity+2>Double.Infinity , 失败)
4—>1(Double.Infinity+1>Double.Infinity , 失败)。

sendMsg方法执行完成之后,根据顶点处于激活态的条件,顶点5 成功地分别给顶点3 和 顶点6 发送了消息,顶点3 和 顶点6 也成功地接受到了消息。所以 此时只有5,3,6 三个顶点处于激活态,其他顶点全部钝化。然后收到消息的顶点3和顶点6都调用vprog方法,将收到的消息 与 自身的属性合并。如下图所示。到此第一次迭代结束。

3. 第二次迭代开始:

顶点3 给 顶点6 发送消息失败,顶点3 给 顶点2 发送消息成功,此时 顶点3 成功发送消息,顶点2 成功接收消息,所以顶点2 和 顶点3 都成为激活状态,其他顶点都成为钝化状态。然后顶点2 调用vprog方法,将收到的消息 与 自身的属性合并。 下图所示至此第二次迭代结束。
在这里插入图片描述

4. 第三次迭代开始:

顶点3分别发送消息给顶点2失败 和 顶点6失败,顶点2 分别发消息给 顶点1成功、顶点4成功、顶点5失败 ,所以 顶点2、顶点1、顶点4 成为激活状态,其他顶点为钝化状态。顶点1 和 顶点4分别调用vprog方法,将收到的消息 与 自身的属性合并。下图所示至此第三次迭代结束。
在这里插入图片描述

5. 第四次迭代开始:

顶点2 分别发送消息给 顶点1失败 和 顶点4失败。顶点4 给 顶点1发送消息成功,顶点1 和 顶点4 进入激活状态,其他顶点进入钝化状态。顶点1 调用vprog方法,将收到的消息 与 自身的属性合并 。
在这里插入图片描述

6. 第五次迭代开始:

顶点4 再给 顶点1发送消息失败,顶点4 和 顶点1 进入钝化状态,此时全图都进入钝化状态。至此结束。
在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-06 09:53:50  更:2021-08-06 09:56:53 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/17 18:46:10-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码