一 写在前面
Hive的自定义函数(User-Defined Functions)分三类:
- UDF:one to one,进一出一,row mapping。是row级别操作,类似upper、substr等
- UDAF:many to one,进多出一,row mapping。是row级别操作,类似sum、min等
- UDTF:one to many ,进一出多。类似:alteral view与explode实现的一行变多行
接下来写一个统计统计字段长度的UDF
二 JAVA实现
1、UDF函数编写
1.1 前期准备
- 在IntelliJ IDEA里新建一个Maven项目
- 在pom.xml引入hive的依赖(注意版本号)
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
</dependencies> 3.新建一个类(com.zhs.udf.FirstUDF)
1.2 FirstUDF代码的代码
package com.zhs.udf;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
// extends 表示继承类。UDF需要继承GenericUDF
public class FirstUDF extends GenericUDF {
@Override
// ObjectInspector接口使得Hive可以不拘泥于一种特定数据格式, 使得数据流在输入端和输出端切换不同的格式
// UDF的初始化方法,主要用于验证输入参数的数据类型及输入参数的个数,返回值为ObjectInspector类型
public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
if (objectInspectors.length != 1){
throw new UDFArgumentException("参数个数不为1");
}
// 基本类型的OI实例由工厂类 PrimitiveObjectInspectorFactory 创建
// 其它类型的OI实例由工厂类 ObjectInspectorFactory 创建
return PrimitiveObjectInspectorFactory.javaIntObjectInspector;
}
@Override
// 当UDF被调用时,对每行的数据进行逻辑处理
// DeferredObject 是封装的数组格式
public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
// 1.取出输入数据
String input = deferredObjects[0].get().toString();
// 2.判断数据数据是否为null
if (input == null) {
return 0;
}
// 3.返回输入数据的长度
return input.length();
}
@Override
// 展示方法,主要用于打印UDF的一些基本信息,如MR的执行计划,一般返回空即可
public String getDisplayString(String[] strings) {
return "";
}
}
1.3 打包
2、上传Jar包
Tips1: UDF使用的jdk版本最好与Hadoop集群使用的jdk版本相同
Tips2: 可以使用Xftp将本地jar包上传到安装hive的虚拟机/远程服务器上
2.1 如果是临时用,可以上传到任一目录下
2.2 如果是永久用,需要把jar包上传到hdfs的lib目录下
-- 第一步
把jar包先上传到了/home/zhs/Documents/hive-demo-1.0-SNAPSHOT.jar路径下
-- 第二步
-- 1 可以看看有没有lib目录
hadoop fs -ls /lib
-- 2 如果没有,则在hadoop上创建lib目录
hadoop fs -mkdir /lib
-- 3 把jar复制到lib目录下
hadoop fs -put /home/zhs/Documents/hive-demo-1.0-SNAPSHOT.jar /lib/
-- 4 可以再看下有没有复制成功
hadoop fs -mkdir /lib
3、添加jar包到hive环境中(临时函数需要,永久函数不需要这步)
Tips:通过该方式添加的jar文件只存在于当前会话中,当会话关闭后不能够继续使用
// 先启动hive
// 语法:add jar +jar包所在的目录/jar包名字;
// /home/zhs/Documents/是jar包上传的目录
add jar /home/zhs/Documents/hive-demo-1.0-SNAPSHOT.jar;
4、临时函数
// 创建临时函数
// 语法:CREATE TEMPORARY FUNCTION function_name AS class_name;
// class_name 就是类名+包名
create temporary function my_len as "com.zhs.udf.FirstUDF"
// 销毁临时函数
// 语法:DROP TEMPORARY FUNCTION [IF EXISTS] function_name;
drop temporary function my_len ;
5、永久函数
create function my_len as "com.zhs.udf.FirstUDF"
using jar 'hdfs://localhost:9000/lib/hive-demo-1.0-SNAPSHOT.jar';
6、调用/测试
select my_len("zhs");
-- 返回结果是 3
三 Python实现
就是试了试可以用,但真正的实战意义还没有琢磨
1、创建my_len.py并编写函数
practice.sc表如下:
# -*- coding: utf-8 -*-
import sys
for line in sys.stdin:
detail = line.strip().split(",")
sid = detail[0]
cid = detail[1]
socre = detail[2]
len_sid = len(sid)
len_cid = len(cid)
len_socre = len(socre)
len_list = (str(len_sid),str(len_cid),str(len_socre))
print("\t".join(len_list))
2、上传脚本
/home/zhs/Documents/my_len.py?
3、添加py脚本hive环境中
add jar /home/zhs/Documents/my_len.py;
4、调用
SELECT TRANSFORM (sid,cid,score)
USING 'python3 /home/zhs/Documents/my_len.py'
AS (len1,len2,len3)
FROM practice.sc;
输出示例
?
?
|