Join
表信息和sql
表A : table_A,字段a1,a2,No 表B : table_B,字段b1,b2,No select a1,a2,b1,b2 from table_A t_A join table_B t_B on t_A.No=t_B.No ;
总体流程
流程的设计
step1 : 遍历table_A 表的所有记录,以过滤条件中的字段No的值进行分组,每个No值对于的A组数据,遍历出来,待用 step2 : 遍历table_B 表的所有记录,同A的处理方式
拿出A组数据去匹配B组数据,以A组的No值去比较,匹配出满足A中No=x时候,B中No也等于x的值的哪些数据行,(过滤 filter)
然后根据select a1,a2,b1,b2 过滤出匹配到的记录行中的几个字段(裁剪 project)
考虑性能损耗点
算法上是 遍历A ,遍历B,取相同key值后,join 实现上是 A和B的数据特点:无序,分布存储在多台数据节点上
1、抽取数据损耗时间:根据No这个value值作为分区的,shuffle read/write ,同时shuffle A,B表,慢 2、无序数据如果能先排序好(A表B表各自根据No排序),则遍历过程中,不会重复从第一行全量变量抽取,而是按顺序抽取,减少重复, 假设排好序的A表中,No取x时,A表的指针从 [10000行到20000行]都是X,No取y时,A表指针直接从20001行开始取,B表同理 》》sort merge join 增加sort merge 过程 3、计算代码和数据运行的位置, 如果两个表都很大,则计算代码分布式,数据shuffle到各个物理节点上跑 如果两个表中有一个表比较小,则数据移动,每个运行的物理节点上,数据都加载到内存里面,直接从内存里面取优于从磁盘取,加快运行速度,去掉了shuffle过程》》》 广播 broadcast join 去掉了shuffle read 过程 增加数据通过网卡移动分发整个表的过程
实现方式一:hash join (小表join小表)
单机就能满足:小表都在一台机器上,直接加载到机器上跑,hash 表记录所有记录
- 确定 Build Table 以及 Probe Table:这个概念比较重要,Build Table 使用 join key 构建 Hash Table,而 Probe Table 使用 join key 进行探测,探测成功就可以 join 在一起。通常情况下,小表会作为 Build Table,大表作为 Probe Table。此事例中 item 为 Build Table,order 为 Probe Table。
- 构建 Hash Table:依次读取 Build Table(小表)的数据,对于每一行数据根据 join key(小表.No)进行 hash,hash 到对应的 Bucket,生成 hash table 中的一条记录。数据缓存在内存中,如果内存放不下需要 dump 到外存。
- 探测:再依次扫描 Probe Table(大表)的数据,使用相同的 hash 函数映射 Hash Table 中的记录,映射成功之后再检查 join 条件(小表.No = 大表.No),如果匹配成功就可以将两者 join 在一起。
复杂度计算
只需要扫描一次就行 O(tableA+tableB)
实现方式二 : broadcast join (大表 join 极小表)
要广播的表数据移动过程
step 1: collect到driver端 step 2: 由driver分发到所有的executor上
好处:
减少了shuffle 过程,使得所有计算在自己节点上运行,避免通过网络拉取数据匹配的过程
弊端:
如果要广播的表比较大,那么driver端压力较大 每个executor要扫描整个广播表,增大了内存消耗
相关参数:
被广播的表需要小于 spark.sql.autoBroadcastJoinThreshold 所配置的值,默认是10M (或者加了broadcast join的hint)
注意点
基表不能被广播,比如 left outer join 时,只能广播右表
实现方式三: shuffle hash join (大表join 小表)
在大数据条件下如果一张表很小,执行join操作最优的选择无疑是broadcast hash join,效率最高。但是一旦小表数据量增大,广播所需内存、带宽等资源必然就会太大,broadcast hash join就不再是最优方案。此时可以按照join key进行分区,根据key相同必然分区相同的原理,就可以将大表join分而治之,划分为很多小表的join,充分利用集群资源并行化。如下图所示,shuffle hash join也可以分为两步:
shuffle阶段:分别将两个表按照join key进行分区,将相同join key的记录重分布到同一节点,两张表的数据会被重分布到集群中所有节点。这个过程称为shuffle。 hash join阶段:每个分区节点上的数据单独执行单机hash join算法。
https://blog.csdn.net/zyzzxycj/article/details/83414044
tableA join tableB的过程(shuffle+hash join)
相关参数:
Shuffle Hash Join的条件有以下几个: 1、分区的平均大小不超过spark.sql.autoBroadcastJoinThreshold所配置的值,默认是10M 2、基表不能被广播,比如left outer join时,只能广播右表 3、一侧的表要明显小于另外一侧,小的一侧将被广播(明显小于的定义为3倍小,此处为经验值)
实现方式四 : sort merge join (大表join大表)
tableA join tableB的过程(shuffle sort merge)
https://www.pianshen.com/article/644512590/
整个过程分为三个步骤:
- shuffle 阶段:
将两张大表根据 join key 进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理。 2. sort 阶段:
对单个分区节点的两表数据,分别进行排序。 3. merge 阶段:
对排好序的两张分区表数据执行 join 操作。join 操作很简单,分别遍历两个有序序列,碰到相同 join key 就 merge 输出,否则取更小一边。如下图所示:
比较
这几种 Join 的代价关系:cost(broadcast hash join) < cost(shuffle hash join) < cost(sort merge join)
数据仓库设计时最好避免大表与大表的 join 查询,SparkSQL 也可以根据内存资源、带宽资源适量将参数 spark.sql.autoBroadcastJoinThreshold 调大,让更多 join 实际执行为 broadcast hash join。 https://blog.51cto.com/u_15127500/3790440
参考资料 https://blog.51cto.com/u_15127500/3790440 https://www.pianshen.com/article/644512590/ https://blog.csdn.net/zyzzxycj/article/details/83414044 https://cloud.tencent.com/developer/article/1005502 https://www.cnblogs.com/suanec/p/7560399.html
|