IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Hive开窗函数源码 -> 正文阅读

[大数据]Hive开窗函数源码

整体思路

rank和row_number使用起来的区别就是相同的排序,rank是相同的,并且排名不是连续的,核心算法就是UDAF,详细见。。。

了解开窗函数参考https://blog.csdn.net/samll_DATA/article/details/119598024?spm=1001.2014.3001.5501

窗口函数和window字句

关键是理解ROWS BETWEEN含义,也叫做WINDOW子句:
PRECEDING:往前
FOLLOWING:往后
CURRENT ROW:当前行
UNBOUNDED:起点,UNBOUNDED PRECEDING 表示从前面的起点, UNBOUNDED FOLLOWING:表示到后面的终点

支持window字句的源码分析

sum字句

  1. GenericUDAFSumLong buff 存的是long:就是当前分区下所有的累加和

  2. StreamingState extends AbstractAggregationBuffer : result存的是(list)分区每一行的结果,比如就是 前10行到,前两行的结果

  3. SumAvgStreamingState extends StreamingState:intermediateVals存的是(list)用来计算结果的,比如当前结果:S[x…y] = S[y] - S[x-1],也就是每一行从开头的累加和,窗口的和只用通过减法就可以得到.

  4. 核心算法思想: S n S_{n} Sn?代表分区0到numRows=n的和,如果保证,m-n刚好就是窗口windowSize的大小,那么 S m ? S n S_{m}-S_{n} Sm??Sn?就代表某一行的结果(就是某个窗口的和)。具体实现如下
    1、这里也考虑一个相对复杂的特殊情况,CASE:X preceding and Y preceding,这里一般分三种情况a、窗口没有数据;b、窗口只有部分数据;c、窗口全部数据
    2、来数据o,得到累计和sum,减去并移除intermediateVals[0],得到结果s,result.add(s),intermediateVals.add(sum)
    3、回到第2,直到没有数据
    4、ternite 方法补充数据

核心算法代码实现主要涉及两个方法iterate和terminate

public void iterate(AggregationBuffer agg, Object[] parameters)
    throws HiveException {
  SumAvgStreamingState ss = (SumAvgStreamingState) agg;

  wrappedEval.iterate(ss.wrappedBuf, parameters);
  // 添加先置条件,如果是X FOLLOWIGNG and Y FOLLOWIGNG,则会在terminate方法中result结果后边插入null值
  // We need to insert 'null' before processing first row for the case: X preceding and y preceding
  if (ss.numRows == 0) {
    for (int i = wFrameDef.getEnd().getRelativeOffset(); i < 0; i++) {
      ss.results.add(null);
    }
  }

  // Generate the result for the windowing ending at the current row
  if (ss.hasResultReady()) {
    ss.results.add(getNextResult(ss));
  }
  if (!wFrameDef.isStartUnbounded()
      && ss.numRows + 1 >= wFrameDef.getStart().getRelativeOffset()) { // X是FOLLOWING的情况,之前的减的话,加进去减的话就没意义
    ss.intermediateVals.add(getCurrentIntermediateResult(ss));
  }

  ss.numRows++;
}
public Object terminate(AggregationBuffer agg) throws HiveException {
  SumAvgStreamingState ss = (SumAvgStreamingState) agg;
  Object o = wrappedEval.terminate(ss.wrappedBuf);

  // After all the rows are processed, continue to generate results for the rows that results haven't generated.
  // For the case: X following and Y following, process first Y-X results and then insert X nulls.
  // For the case X preceding and Y following, process Y results.
  for (int i = Math.max(0, wFrameDef.getStart().getRelativeOffset()); i < wFrameDef.getEnd().getRelativeOffset(); i++) {
    if (ss.hasResultReady()) {
      ss.results.add(getNextResult(ss));
    }
    ss.numRows++;
  }//X following and Y following和 X preceding and Y following


  for (int i = 0; i < wFrameDef.getStart().getRelativeOffset(); i++) {
    if (ss.hasResultReady()) {
      ss.results.add(null);
    }
    ss.numRows++;
  }//X following and Y following会出现

  return o;// 不太明白返回值为啥是一个元素,而不是一个list,和row_number一样
}

max 开窗函数的字句

  1. 维护一个resultList用来存需要展示的结果和一个maxChain<object<o,numRows>>队列,队列元素[object,numRows]是一个长度为2数组的数组,数组里存放的分别是当前行的元素和对应的行号。

  2. 算法的核心就是维护这个maxchain,使得这个队列,从头到尾是递减的,numRows是递增的,并且可见范围保持在当前行的窗口,也就是,当前行的最大值一定是在个队列的最头部元素,具体实现如下:
    1、当第numRows行的新数据o来了以后,从队尾队首遍历,删除比o小的元素,直到碰到比自己大跳出
    2、如果当前是numRows = 0 ,需要准备一下条件,X preceding and Y preceding
    3、只要是无边界条件或者队列是空的都要添加此元素到队列
    4、获得队首元素,为最大值展示
    5、numRows++
    6、判断队首的元素是不是,下一行还是在窗口内可见,不是话就删除队首的元素
    7、接收下一个元素,从第一步开始

  3. 最后对于X following and Y following和 X preceding and Y following需要补充数据

@Override
public void iterate(AggregationBuffer agg, Object[] parameters)
    throws HiveException {

  State s = (State) agg; //用来缓存中间结果的对象
  Object o = parameters[0]; // 输入的参数,可以认为是要求最大值的那一列数据,一行一行输入

  // 保证队列的顺序,又不至于存大量的数据
  // 维护一个数据由大到小,numRows由小到大的队列,
  // 当第numRows行的新数据o来了以后,从队尾队首遍历,删除比o小的元素,直到碰到比自己大跳出
  while (!s.maxChain.isEmpty()) {
    if (!removeLast(o, s.maxChain.getLast()[0])) {
      break;
    } else {
      s.maxChain.removeLast();
    }
  }

  // We need to insert 'null' before processing first row for the case: X preceding and y preceding
  // 特殊情况的准备条件
  if (s.numRows == 0) {
    for (int i = wFrameDef.getEnd().getRelativeOffset(); i < 0; i++) {
      s.results.add(null);
    }
  }

  /*
   * add row to chain. except in case of UNB preceding: - only 1 max needs
   * to be tracked. - current max will never become out of range. It can
   * only be replaced by a larger max.
   */
  // 除了无边界每一行都要被添加进去
  if (!wFrameDef.isStartUnbounded() || s.maxChain.isEmpty()) {
    o = o == null ? null : ObjectInspectorUtils.copyToStandardObject(o,
        inputOI(), ObjectInspectorCopyOption.JAVA);
    s.maxChain.addLast(new Object[] { o, s.numRows });
  }

  // 队列的第一个元素就是窗口的最大值,保存到result
  if (s.hasResultReady()) {
    s.results.add(s.maxChain.getFirst()[0]);
  }
  // 行数加一
  s.numRows++;

  // 这里是不是有bug,如果X preceding and y preceding
  // 1.1   这里并没有,这里是提前插入了空值,也就是当前的rownumber 计算出来的并不是当前行要展示的result
  // 判断队列第一个元素是不是在下一个窗口的范围内,如果不在直接删除
  int fIdx = (Integer) s.maxChain.getFirst()[1];
  if (!wFrameDef.isStartUnbounded()
      && s.numRows >= fIdx +  wFrameDef.getWindowSize()) {
    s.maxChain.removeFirst();
  }
}

疑问:case: X preceding and y preceding 怎么处理多余的数据

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-13 12:07:06  更:2021-08-13 12:07:17 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 9:19:08-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码