【软件工程实践】Hive研究-Blog4
2021SC@SDUSC
本期研究内容简略介绍
本人负责的是负责的是将查询块QB转换成逻辑查询计划(OP Tree) 如下的代码出自apaceh-hive-3.1.2-src/ql/src/java/org/apache/hadoop/hive/ql/plan中,也就是我的分析目标代码。由于Blog3还尚未研究完MetastoreStatsConnector.java文件,本周我们就继续研究该文件源码。
MetastoreStatsConnector.java文件代码解析
我们首先附上整个java文件的源码。
package org.apache.hadoop.hive.ql.plan.mapper;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.hadoop.hive.metastore.api.RuntimeStat;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
import org.apache.hadoop.hive.ql.optimizer.signature.RuntimeStatsMap;
import org.apache.hadoop.hive.ql.optimizer.signature.RuntimeStatsPersister;
import org.apache.hadoop.hive.ql.stats.OperatorStats;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
class MetastoreStatsConnector implements StatsSource {
private static final Logger LOG = LoggerFactory.getLogger(MetastoreStatsConnector.class);
private final StatsSource ss;
private ExecutorService executor;
MetastoreStatsConnector(int cacheSize, int batchSize, StatsSource ss) {
this.ss = ss;
executor = Executors.newSingleThreadExecutor(
new BasicThreadFactory.Builder()
.namingPattern("Metastore-RuntimeStats-Loader-%d")
.daemon(true)
.build());
executor.submit(new RuntimeStatsLoader(cacheSize, batchSize));
}
private class RuntimeStatsLoader implements Runnable {
private int maxEntriesToLoad;
private int batchSize;
public RuntimeStatsLoader(int maxEntriesToLoad, int batchSize) {
this.maxEntriesToLoad = maxEntriesToLoad;
if (batchSize <= 0) {
this.batchSize = -1;
} else {
this.batchSize = batchSize;
}
}
@Override
public void run() {
int lastCreateTime = Integer.MAX_VALUE;
int loadedEntries = 0;
try {
do {
List<RuntimeStat> rs = Hive.get().getMSC().getRuntimeStats(batchSize, lastCreateTime);
if (rs.size() == 0) {
break;
}
for (RuntimeStat thriftStat : rs) {
loadedEntries += thriftStat.getWeight();
lastCreateTime = Math.min(lastCreateTime, thriftStat.getCreateTime() - 1);
try {
ss.putAll(decode(thriftStat));
} catch (IOException e) {
logException("Exception while loading runtime stats", e);
}
}
} while (batchSize > 0 && loadedEntries < maxEntriesToLoad);
} catch (TException | HiveException e) {
logException("Exception while reading metastore runtime stats", e);
}
}
}
@Override
public boolean canProvideStatsFor(Class<?> clazz) {
return ss.canProvideStatsFor(clazz);
}
@Override
public Optional<OperatorStats> lookup(OpTreeSignature treeSig) {
return ss.lookup(treeSig);
}
@Override
public void putAll(Map<OpTreeSignature, OperatorStats> map) {
if (map.size() == 0) {
return;
}
ss.putAll(map);
executor.submit(new RuntimeStatsSubmitter(map));
}
class RuntimeStatsSubmitter implements Runnable {
private Map<OpTreeSignature, OperatorStats> map;
public RuntimeStatsSubmitter(Map<OpTreeSignature, OperatorStats> map) {
this.map = map;
}
@Override
public void run() {
try {
RuntimeStat rec = encode(map);
Hive.get().getMSC().addRuntimeStat(rec);
} catch (TException | HiveException | IOException e) {
logException("Exception while persisting runtime stat", e);
}
}
}
private RuntimeStat encode(Map<OpTreeSignature, OperatorStats> map) throws IOException {
String payload = RuntimeStatsPersister.INSTANCE.encode(new RuntimeStatsMap(map));
RuntimeStat rs = new RuntimeStat();
rs.setWeight(map.size());
rs.setPayload(ByteBuffer.wrap(payload.getBytes(Charsets.UTF_8)));
return rs;
}
private Map<OpTreeSignature, OperatorStats> decode(RuntimeStat rs) throws IOException {
RuntimeStatsMap rsm = RuntimeStatsPersister.INSTANCE.decode(rs.getPayload(), RuntimeStatsMap.class);
return rsm.toMap();
}
public void destroy() {
executor.shutdown();
}
static void logException(String msg, Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug(msg, e);
} else {
LOG.info(msg + ": " + e.getMessage());
}
}
}
我们在Blog3中已经研究完方法putAll了,那么我们就从类RuntimeStatsSubmitter这个类开始研究。
类构造器方法RuntimeStatsSubmitter
private Map<OpTreeSignature, OperatorStats> map;
public RuntimeStatsSubmitter(Map<OpTreeSignature, OperatorStats> map) {
this.map = map;
}
这是一个普通的构造方法,它先定义了一个Map类型的公共变量map,接着在构造方法中需要外部传进一个Map类型的参数,将这个传进的参数设置到变量map上。
方法run
@Override
public void run() {
try {
RuntimeStat rec = encode(map);
Hive.get().getMSC().addRuntimeStat(rec);
} catch (TException | HiveException | IOException e) {
logException("Exception while persisting runtime stat", e);
}
}
}
我们来看一下整个方法的外层结构,是用try-catch语句包含起来的,而其中捕获的异常类型包含了TException、HiveException和IOException,我们可以通过名字来理解HiveException是调用Hive的内部方法时出错所抛出的异常,IOException为输入输出时所发生的异常,那么这个TException是一个什么类型的异常呢?我们搜索TException,发现它是来自一个java类,该类的名称为org.apache.thrift.TException 。那么,什么是thirft呢?我们百度之后得到了如下答案: Thrift是Apache下的一个子项目,最早是Facebook的项目,后来Facebook提供给Apache作为开源项目,在官网上,Thrift被描述为“Scalable Cross-Language Services Implementation”,Thrift具有以下特征: 1.它有自己的跨机器的通信框架,并提供一套库。 2.它是一个代码生成器,按照它的规则,可以生成多种编程语言的通讯过程代码。
一般情况下的跨机器的通信框架都是跨软件平台的(Linux,windows), 而Thrift最特别之处在于它是跨语言的:例如,你可以用几乎所有流行语言(C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript等等)来实现通讯过程,这样做的好处就是你不用为编程语言发愁,如果服务器端与客户端都需要编写,选择你最拿手或项目规定的语言,就可以生成一个通讯框架;如果编写一个服务器端程序,定义好通讯规则(在Thrift中是.thrift文件)后,你所采用的服务器端实现语言不会影响到客户端,以后使用的人可以采用其他编程语言来实现客户端。
回到源码中来,我们看见语句 logException("Exception while persisting runtime stat", e); 其作用是抛出异常详细原因和建议,然后在控制台输出语句 "Exception while persisting runtime stat"
我们再来看try语句中的内容。首先是第一句语句 RuntimeStat rec = encode(map); 这里定义了一个RuntimeStat类型的变量rec,并赋予它值为调用encode方法后返回的内容。这个encode方法是一个什么方法?就是一个格式化所输入的内容的方法,相当于转码。对于它的具体解析,在下文我们会给出对应篇章来介绍。我们来看第二行代码:Hive.get().getMSC().addRuntimeStat(rec); Hive显然是调用了Hive的官方库,而后续的get和getMSC是函数的调用窗口,说明addRuntimeStat函数被封装了至少三层,这是一个十分保险的措施,虽然调用时多了几个调用,但是能够确保调用的函数是100%正确的,这也是我们在之后的编码过程中需要重点学习的思想。而这个addRuntimeStat方法,是用于在获取到的任务提交后所占用的CPU资源的反馈后边加上指定格式的内容,相当于String方法的合并两个String对象一样。至此,我们大概清楚了run方法的工作:将输入格式化编码,然后添加至任务CPU占用资源反馈信息末尾,并使用try-catch语句保证不因为其中的过程发生错误而导致整个进程崩溃的情况。
方法encode
private RuntimeStat encode(Map<OpTreeSignature, OperatorStats> map) throws IOException {
String payload = RuntimeStatsPersister.INSTANCE.encode(new RuntimeStatsMap(map));
RuntimeStat rs = new RuntimeStat();
rs.setWeight(map.size());
rs.setPayload(ByteBuffer.wrap(payload.getBytes(Charsets.UTF_8)));
return rs;
}
我们可以看见,在语句 String payload = RuntimeStatsPersister.INSTANCE.encode(new RuntimeStatsMap(map)); 中调用了java库自带的encode方法。我们在网上查阅到了具体的资料:客户端在进行请求的时候,请求中可能会包含非ASCII码形式的内容,比如中文。而直接把中文放到请求中请求是不允许的,所以需要用encode编码,将非ASCII码内容转换成可以传输的字符。而不会被改变的内容有: 1.大写字母A-Z 2.小写字母a-z 3.数字 0-9 4.标点符 - _ . ! ~ * ’ (和 ,) 其中,encode的编码原理是这样子的:
1、将需要转换的内容(ASCII码形式之外的内容),用十六进制表示法转换出来,并在之前加上%开头 例如: 0x9c URLEncoder --> %9c
2、内容中的空格‘ ’ ,全部用+代替
回到源码,我们来看第二行做了什么事情。 RuntimeStat rs = new RuntimeStat(); 这里定义了一个显示CPU占用资源反馈的变量rs。类RuntimeStat我们已经接触多次,它是一个反馈信息的类。整个变量rs是下文调用函数的一个接口。
对于 rs.setWeight(map.size()); 中的setWeight方法,我们在Blog3就已经介绍了。Weight为权重值,也就是CPU运行任务的时间,这里将其设置为map的规模大小值。
对于rs.setPayload(ByteBuffer.wrap(payload.getBytes(Charsets.UTF_8))); 我们来看一下setPayload是一个什么方法。经过查阅资料,它是一个设置消息的方法,相当于消息体现在是空白的或者有东西的,我们现在使用该方法来覆盖它,而后面的Charsets.UTF_8显然是设置消息的字符编码为UTF_8格式。ByteBuffer.wrap就是设置缓冲区的函数,从可用内存中选择第一块符合条件的缓冲区域给本进程使用。本方法也大致清楚了。首先是调用官方库的格式化转码,然后设置一个临时变量用于返回,将临时变量的Weight和Payload参数均设置成需要的内容。
方法decode
private Map<OpTreeSignature, OperatorStats> decode(RuntimeStat rs) throws IOException {
RuntimeStatsMap rsm = RuntimeStatsPersister.INSTANCE.decode(rs.getPayload(), RuntimeStatsMap.class);
return rsm.toMap();
}
我们在了解完encode方法后,理解decode方法可以说是触类旁通。首先对于第一行 RuntimeStatsMap rsm = RuntimeStatsPersister.INSTANCE.decode(rs.getPayload(), RuntimeStatsMap.class); 我们参考encode的解析,很容易就会知道这是一个反解码的方法。意思就是比如中文的一对应%1234,那么就会把%1234转换为原来的中文的一。整个decode方法就已经清楚明了了。
方法destroy
public void destroy() {
executor.shutdown();
}
我们在Blog3的时候,已经初步接触了excutor了。而经过网上查阅资料后得知,executor.shutdown()就是在终止前允许执行以前提交的任务,完成任务后在回收资源,防止任务进行到一半时由于意外的关闭指令导致结果无法返回或计算任务中止。
方法logException
static void logException(String msg, Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug(msg, e);
} else {
LOG.info(msg + ": " + e.getMessage());
}
}
显而易见,这是一个处理异常的方法。而在方法里面的if语句中的判断,我们查阅资料,isDebugEnabled作用是进行预先判断,提升系统性能。 我们举个简单的例子: ?假设我们的日志级别设置为info,那这句话不会输出日志,但这个方法还是会调用(预判断作用)。要调用这个方法,必须提供参数。someMethod()方法返回的结果就是参数的一部分。假设someMethod()要执行n秒钟,n秒钟后,进入到debug()方法里; ?? 但是日志级别为info。结果是日志虽然没有输出,却花费了n秒钟来构造参数。很显然这里得不偿失的。尽管实际应用中几乎不可能有这种花n秒钟来构造这样一个参数的情况,但如果并发数大的话,这样写还是会影响系统的性能的。这个时候,就应该写成:
logger.debug("The money is " + someMethod());
如果debug的参数很简单的话,也可以直接写 logger.debug(message)的。官方的说法,执行一次logger.isDebugEnabled()这样的判断花费的时间大概是写日志时间的万分之一.虽然这个比例很小,但是,程序中的任何地方放到并发的环境下,我们就得重新考虑了。
我们来看一下isDebugEnabled()的源码:
public boolean isDebugEnabled() {
if(repository.isDisabled( Level.DEBUG_INT))
return false;
return Level.DEBUG.isGreaterOrEqual(this.getEffectiveLevel());
}
以及debug的源码:
public void debug(Object message) {
if(repository.isDisabled(Level.DEBUG_INT))
return;
if(Level.DEBUG.isGreaterOrEqual(this.getEffectiveLevel())) {
forcedLog(FQCN, Level.DEBUG, message, null);
}
}
那么,整个方法的意图和操作都显而易见了:提升系统性能,加速程序执行速度。
小结
经过本周的学习,再结合上周的学习,我们终于将MetastoreStatsConnector.java文件的源码全部解析完毕了,收获了很多知识,比如对于执行器excutor的各种使用方法,希望下个星期的学习能够带来更多收获。
|