Common Join
Common join的操作被编译成MapReduce任务。
Map阶段把join操作的on条件字段作为输出的key,把select、where的字段和表名拼接成输出的value。下图是一个简单的案例: 
更通用的: 
可见有三个阶段
- Mapper从表中读取数据,并依据规则把k-v键值对发送到中间文件。
- 中间文件在Shuffle阶段对这些键值对进行排序和合并。
- Reducer利用排序,把相同key的键值对发送到同一个reduce方法中进行join
Map Join
Map Join的目的是减少Shuffle和Reducer阶段的代价,仅在Map阶段进行Join。这样不仅可以提高join的速度,由于没有shuffle,进而可以避免数据倾斜的问题
这就要求参与join的表至少要有一个足够小到可以装进内存,让所有的Mapper可以把该表读进MapTask的运行时内存。
但是如果成千上万个Mapper同时去HDFS中读取这个小表,效率很低,容易导致Mapper在读取队列中操作超时,因此Hive提出了分布式缓存技术的解决方案。
使用分布式缓存
优化的基本思想是在原始Join操作的MapReduce任务创建之前创建一个新的本地MapReduce任务,这个任务是将小表数据从HDFS上读取到内存中的hash表中,读取完成后,将hash表序列化为文件,随后将该文件上传到Hadoop的分布式缓存中,分布式缓存会将该文件发送到每个Mapper所在的本地磁盘上。
因此所有Mapper都可以将这个hash表文件加载到MapTask的运行内存中去。
优化后,从HDFS读取小表的操作只需要进行一次。并且如果多个Mapper在同一台机器上,他们读取的是同一份小表的hash表序列化文件。

大小表的区分
此前Hive用户需要在查询中给出提示来指定哪个表是小表,用户体验很差。随后Hive支持自动尝试将Common Join转为Map Join,Hive自动识别哪些表可以作为小表,并将这些小表放入内存中。
Hive查询处理在编译时不知道输入文件的大小,并且有可能子查询的表也会生成中间表,因此Hive只能在执行期间计算出输入文件的大小。

如上图所示,左侧是Common Join,不做过多解释。
右侧流程是Map Join执行流程。以select * from src1 x join src2 y on x.key=y.key 为例。因为表src1和src2都可能是大表,所以处理器生成2个Map Join任务,第一个假设src1是大表,第二个假设src2是大表,这两个任务都放在Condition Task中,此外还有Common Join任务,在执行本阶段任务时,会先通过计算表大小决定前两个Map Join是否可以执行,如果都无法执行则执行Common Join。通过这种机制可以动态地将Common Join转化为Map Join。
mmon Join。通过这种机制可以动态地将Common Join转化为Map Join。
参考资料:https://cloud.tencent.com/developer/article/1481780
|