Hive 中 udf、udaf 和 udtf 的使用
Hive 是基于 Hadoop 中的 MapReduce,提供 HQL 查询的数据仓库.
Hive 是一个很开放的系统,很多内容都支持用户定制. 如 : 文件格式、MR脚本、自定义函数、自定义聚合函数 等.
UDF
编写 UDF函数 的时候需要注意一下几点:
自定义 UDF 需要继承 org.apache.hadoop.hive.ql.UDF
需要实现 evaluate
函数
以下是两个数求和函数的UDF。evaluate函数代表两个整型数据相加
1 2 3 4 5 6 7 8 9 10 11 12 13 package hive.connect; import org.apache.hadoop.hive.ql.exec.UDF; public final class Add extends UDF { public Integer evaluate (Integer a, Integer b) { if (null == a || null == b) { return null ; } return a + b; } }
UDAF
函数类需要继承 UDAF 类,内部类 Evaluator 需要实现 UDAFEvaluator 接口.
Evaluator 需要实现 init、iterate、terminatePartial、merge、terminate 这几个函数.
init
函数实现接口 UDAFEvaluator 的 init 函数.
iterate
接收传入的参数,并进行内部的轮转。其返回类型为 boolean.
terminatePartial
无参数,其为 iterate 函数轮转结束后,返回轮转数据.
merge
接收 terminatePartial 的返回结果,进行数据 merge 操作,其返回类型为boolean.
terminate
返回最终的聚集函数结果.
下面是一个简单的 UDAF 的 demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 package com.x.user_bhv;import com.google.common.collect.Maps;import org.apache.hadoop.hive.ql.exec.UDAF;import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;import java.util.HashMap;import java.util.Map;public class UDAFMergeIntToIntMap extends UDAF { public static class PartialResult { Map<Integer, Integer> attributes; PartialResult() { attributes = Maps.newHashMap(); } } public static class UnitIdUDAFEvaluator implements UDAFEvaluator { private PartialResult partialResult; public UnitIdUDAFEvaluator () { super (); init(); } public void init () { System.out.println("map init" ); partialResult = new PartialResult(); } public boolean iterate (Map<Integer, Integer> attributes_args) { if (attributes_args == null || attributes_args.isEmpty()) { return true ; } for (Map.Entry<Integer, Integer> entry : attributes_args.entrySet()) { this .partialResult.attributes.put(entry.getKey(), entry.getValue()); } return true ; } public PartialResult terminatePartial () { return this .partialResult; } public boolean merge (PartialResult other) { for (Map.Entry<Integer, Integer> entry : other.attributes.entrySet()) { this .partialResult.attributes.put(entry.getKey(), entry.getValue()); } return true ; } public Map<Integer, Integer> terminate () { if (partialResult == null ) { return new HashMap<Integer, Integer>(); } else { return this .partialResult.attributes; } } } public static void main (String[] args) { } }
在 Hive 脚本中的使用示例 :
1 2 3 4 5 6 7 8 9 hql="ADD jar ${jar_dir} /user_bhv_for_hive.jar; CREATE TEMPORARY FUNCTION merge_int_to_int_map AS 'com.x.user_bhv.UDAFMergeIntToIntMap'; INSERT OVERWRITE TABLE ${table_user_buy_category} SELECT mobile_number, merge_int_to_int_map (level1_id_count_map) FROM ods_dm_e_coupon GROUP BY mobile_number
Summary
重载 evaluate 函数.
UDF 函数中参数类型可以为Writable,也可为java中的基本数据对象.
UDF 支持变长的参数.
Hive 支持隐式类型转换.
客户端退出时,创建的临时函数自动销毁.
evaluate函数必须要返回类型值,空的话返回null,不能为void类型.
UDF 和 UDAF 都可以重载.
查看函数 SHOW FUNCTIONS.
UDAF: User Defined Aggregation Function
Reference
Checking if Disqus is accessible...