2021SC@SDUSC
概述
执行引擎将MapReduce作业以排序顺序提交到Hadoop。这些MapReduce作业在Hadoop上执行,产生所需的结果。
AccumulatorOptimizerUtil类
addAccumulator()方法
public static void addAccumulator(PhysicalPlan plan, List<PhysicalOperator> pos) {
if (pos == null || pos.size() == 0) {
return;
}
PhysicalOperator po_package = pos.get(0);
if (!po_package.getClass().equals(POPackage.class)) {
return;
}
Packager pkgr = ((POPackage) po_package).getPkgr();
if (!pkgr.getClass().equals(Packager.class)) {
return;
}
}
?if (pos == null || pos.size() == 0) { ? ? ? ? ? ? return; ? ? ? ? }
检查传入的pos是否是一个map-reduce 工程。
PhysicalOperator po_package = pos.get(0); ? ? ? ? if (!po_package.getClass().equals(POPackage.class)) { ? ? ? ? ? ? return; ? ? ? ? }
判断传入的pos的第0行的类型是否是POPackage类型。
Packager pkgr = ((POPackage) po_package).getPkgr(); ? ? ? ? if (!pkgr.getClass().equals(Packager.class)) { ? ? ? ? ? ? return; ? ? ? ? }
判断po_package是否属于Packager包。
CombinerOptimizerUtil类
addCombiner方法
public static void addCombiner(PhysicalPlan mapPlan, PhysicalPlan reducePlan,
PhysicalPlan combinePlan, CompilationMessageCollector messageCollector,
boolean doMapAgg) throws VisitorException {
List<PhysicalOperator> mapLeaves = mapPlan.getLeaves();
if (mapLeaves == null || mapLeaves.size() != 1) {
messageCollector.collect("Expected map to have single leaf", MessageType.Warning,
PigWarning.MULTI_LEAF_MAP);
return;
}
PhysicalOperator mapLeaf = mapLeaves.get(0);
if (!(mapLeaf instanceof POLocalRearrange)) {
return;
}
POLocalRearrange rearrange = (POLocalRearrange)mapLeaf;
List<PhysicalOperator> reduceRoots = reducePlan.getRoots();
if (reduceRoots.size() != 1) {
messageCollector.collect("Expected reduce to have single root", MessageType.Warning,
PigWarning.MULTI_ROOT_REDUCE);
return;
}
}
List<PhysicalOperator> mapLeaves = mapPlan.getLeaves(); ? ? ? ? if (mapLeaves == null || mapLeaves.size() != 1) { ? ? ? ? ? ? messageCollector.collect("Expected map to have single leaf", MessageType.Warning, ? ? ? ? ? ? ? ? ? ? PigWarning.MULTI_LEAF_MAP); ? ? ? ? ? ? return; ? ? ? ? }
检查传入的MapReduce作业mapPlan是否包含聚集和遍历过程。
POLocalRearrange rearrange = (POLocalRearrange)mapLeaf
获取Map中的POLocalRearrange类型数据
总结
本周分析了执行引擎中的几个工具类中的方法,了解了一些执行引擎中的一些操作过程。
|