整体思路
rank和row_number使用起来的区别就是相同的排序,rank是相同的,并且排名不是连续的,核心算法就是UDAF,详细见。。。
了解开窗函数参考https://blog.csdn.net/samll_DATA/article/details/119598024?spm=1001.2014.3001.5501
窗口函数和window字句
关键是理解ROWS BETWEEN含义,也叫做WINDOW子句: PRECEDING:往前 FOLLOWING:往后 CURRENT ROW:当前行 UNBOUNDED:起点,UNBOUNDED PRECEDING 表示从前面的起点, UNBOUNDED FOLLOWING:表示到后面的终点
支持window字句的源码分析
sum字句
-
GenericUDAFSumLong buff 存的是long:就是当前分区下所有的累加和 -
StreamingState extends AbstractAggregationBuffer : result存的是(list)分区每一行的结果,比如就是 前10行到,前两行的结果 -
SumAvgStreamingState extends StreamingState:intermediateVals存的是(list)用来计算结果的,比如当前结果:S[x…y] = S[y] - S[x-1],也就是每一行从开头的累加和,窗口的和只用通过减法就可以得到. -
核心算法思想:
S
n
S_{n}
Sn?代表分区0到numRows=n的和,如果保证,m-n刚好就是窗口windowSize的大小,那么
S
m
?
S
n
S_{m}-S_{n}
Sm??Sn?就代表某一行的结果(就是某个窗口的和)。具体实现如下 1、这里也考虑一个相对复杂的特殊情况,CASE:X preceding and Y preceding,这里一般分三种情况a、窗口没有数据;b、窗口只有部分数据;c、窗口全部数据 2、来数据o,得到累计和sum,减去并移除intermediateVals[0],得到结果s,result.add(s),intermediateVals.add(sum) 3、回到第2,直到没有数据 4、ternite 方法补充数据
核心算法代码实现主要涉及两个方法iterate和terminate
public void iterate(AggregationBuffer agg, Object[] parameters)
throws HiveException {
SumAvgStreamingState ss = (SumAvgStreamingState) agg;
wrappedEval.iterate(ss.wrappedBuf, parameters);
if (ss.numRows == 0) {
for (int i = wFrameDef.getEnd().getRelativeOffset(); i < 0; i++) {
ss.results.add(null);
}
}
if (ss.hasResultReady()) {
ss.results.add(getNextResult(ss));
}
if (!wFrameDef.isStartUnbounded()
&& ss.numRows + 1 >= wFrameDef.getStart().getRelativeOffset()) {
ss.intermediateVals.add(getCurrentIntermediateResult(ss));
}
ss.numRows++;
}
public Object terminate(AggregationBuffer agg) throws HiveException {
SumAvgStreamingState ss = (SumAvgStreamingState) agg;
Object o = wrappedEval.terminate(ss.wrappedBuf);
for (int i = Math.max(0, wFrameDef.getStart().getRelativeOffset()); i < wFrameDef.getEnd().getRelativeOffset(); i++) {
if (ss.hasResultReady()) {
ss.results.add(getNextResult(ss));
}
ss.numRows++;
}
for (int i = 0; i < wFrameDef.getStart().getRelativeOffset(); i++) {
if (ss.hasResultReady()) {
ss.results.add(null);
}
ss.numRows++;
}
return o;
}
max 开窗函数的字句
-
维护一个resultList用来存需要展示的结果和一个maxChain<object<o,numRows>>队列,队列元素[object,numRows]是一个长度为2数组的数组,数组里存放的分别是当前行的元素和对应的行号。 -
算法的核心就是维护这个maxchain,使得这个队列,从头到尾是递减的,numRows是递增的,并且可见范围保持在当前行的窗口,也就是,当前行的最大值一定是在个队列的最头部元素,具体实现如下: 1、当第numRows行的新数据o来了以后,从队尾队首遍历,删除比o小的元素,直到碰到比自己大跳出 2、如果当前是numRows = 0 ,需要准备一下条件,X preceding and Y preceding 3、只要是无边界条件或者队列是空的都要添加此元素到队列 4、获得队首元素,为最大值展示 5、numRows++ 6、判断队首的元素是不是,下一行还是在窗口内可见,不是话就删除队首的元素 7、接收下一个元素,从第一步开始 -
最后对于X following and Y following和 X preceding and Y following需要补充数据
@Override
public void iterate(AggregationBuffer agg, Object[] parameters)
throws HiveException {
State s = (State) agg;
Object o = parameters[0];
while (!s.maxChain.isEmpty()) {
if (!removeLast(o, s.maxChain.getLast()[0])) {
break;
} else {
s.maxChain.removeLast();
}
}
if (s.numRows == 0) {
for (int i = wFrameDef.getEnd().getRelativeOffset(); i < 0; i++) {
s.results.add(null);
}
}
if (!wFrameDef.isStartUnbounded() || s.maxChain.isEmpty()) {
o = o == null ? null : ObjectInspectorUtils.copyToStandardObject(o,
inputOI(), ObjectInspectorCopyOption.JAVA);
s.maxChain.addLast(new Object[] { o, s.numRows });
}
if (s.hasResultReady()) {
s.results.add(s.maxChain.getFirst()[0]);
}
s.numRows++;
int fIdx = (Integer) s.maxChain.getFirst()[1];
if (!wFrameDef.isStartUnbounded()
&& s.numRows >= fIdx + wFrameDef.getWindowSize()) {
s.maxChain.removeFirst();
}
}
疑问:case: X preceding and y preceding 怎么处理多余的数据
|