刚刚接触hive,最近在研究UDAF,自己写了一个,代码如下:
package com.udaf.concat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum;
public class myConcat extends UDAF{
static final Log LOG = LogFactory.getLog(GenericUDAFSum.class.getName());
public static class ConcatUDAFEvaluator implements UDAFEvaluator{
public static class PartialResult{
String result;
String delimiter;
}
private PartialResult partialResult;
@Override
public void init() {
partialResult.delimiter = new String("");
partialResult.result = new String("");
}
public boolean iterate(String value, String delimiter){
if(value == null){
return true;
}
if(partialResult == null){
partialResult = new PartialResult();
partialResult.result = new String("");
if(delimiter == null || "".equals(delimiter)){
partialResult.delimiter = new String(",");
}
else{
partialResult.delimiter = new String(delimiter);
}
}
if( partialResult.result.length() > 0){
partialResult.result = partialResult.result.concat(partialResult.delimiter);
}
partialResult.result = partialResult.result.concat(value);
return true;
}
public PartialResult terminatePartial() {
return partialResult;
}
public boolean mergr(PartialResult other) {
if(other == null){
return true;
}
if(partialResult == null){
partialResult = new PartialResult();
partialResult.result = new String(other.result);
partialResult.delimiter = new String(other.delimiter);
}
else{
if( partialResult.result.length() > 0){
partialResult.result = partialResult.result.concat(other.delimiter);
}
partialResult.result = partialResult.result.concat(other.result);
}
return true;
}
public String terminate() {
return partialResult.result;
}
}
}
在hive端:
>add jar /usr/Download/testUDAF.jar;
>creare temporatry function myconcat as 'com.udaf.concat.myConcat';
>select age,myconcat(name,',') from t_udaf group by age;
---------------------------------------------------------------------------
附t_udaf 表结构:
hive> describe extended t_udaf;
OK
namestring
ageint
Detailed Table InformationTable(tableName:t_udaf, dbName:test, owner:root, createTime:1418808574, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:name, type:string, comment:null), FieldSchema(name:age, type:int, comment:null)], location:hdfs://lntusl:9000/usr/hive/warehouse/test.db/t_udaf, inputFormat:org.apache.hadoop.mapred.TextInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, parameters:{serialization.format=/t, field.delim=/t}), bucketCols:[], sortCols:[], parameters:{}), partitionKeys:[], parameters:{numPartitions=0, numFiles=1, transient_lastDdlTime=1418808709, numRows=0, totalSize=135, rawDataSize=0}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE)
Time taken: 0.111 seconds
但是,执行如下语句,报错:
hive> select age,myconcat(name,',') from t_udaf group by age;
FAILED: Hive Internal Error: java.lang.NullPointerException(null)
java.lang.NullPointerException
at org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils$ConversionHelper.<init>(GenericUDFUtils.java:215)
at org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBridge$GenericUDAFBridgeEvaluator.init(GenericUDAFBridge.java:129)
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.getGenericUDAFInfo(SemanticAnalyzer.java:2452)
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.genGroupByPlanGroupByOperator1(SemanticAnalyzer.java:2740)
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.genGroupByPlanMapAggr1MR(SemanticAnalyzer.java:3677)
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.genBodyPlan(SemanticAnalyzer.java:6125)
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.genPlan(SemanticAnalyzer.java:6762)
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:7531)
at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:243)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:431)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:336)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:909)
at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:258)
at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:215)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:406)
at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:689)
at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:557)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
求指点!。~。
package com.udaf.concat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum;
public class myConcat extends UDAF{
static final Log LOG = LogFactory.getLog(GenericUDAFSum.class.getName());
public static class ConcatUDAFEvaluator implements UDAFEvaluator{
public static class PartialResult{
String result;
String delimiter;
}
private PartialResult partialResult;
@Override
public void init() {
partialResult.delimiter = new String("");
partialResult.result = new String("");
}
public boolean iterate(String value, String delimiter){
if(value == null){
return true;
}
if(partialResult == null){
partialResult = new PartialResult();
partialResult.result = new String("");
if(delimiter == null || "".equals(delimiter)){
partialResult.delimiter = new String(",");
}
else{
partialResult.delimiter = new String(delimiter);
}
}
if( partialResult.result.length() > 0){
partialResult.result = partialResult.result.concat(partialResult.delimiter);
}
partialResult.result = partialResult.result.concat(value);
return true;
}
public PartialResult terminatePartial() {
return partialResult;
}
public boolean mergr(PartialResult other) {
if(other == null){
return true;
}
if(partialResult == null){
partialResult = new PartialResult();
partialResult.result = new String(other.result);
partialResult.delimiter = new String(other.delimiter);
}
else{
if( partialResult.result.length() > 0){
partialResult.result = partialResult.result.concat(other.delimiter);
}
partialResult.result = partialResult.result.concat(other.result);
}
return true;
}
public String terminate() {
return partialResult.result;
}
}
}
在hive端:
>add jar /usr/Download/testUDAF.jar;
>creare temporatry function myconcat as 'com.udaf.concat.myConcat';
>select age,myconcat(name,',') from t_udaf group by age;
---------------------------------------------------------------------------
附t_udaf 表结构:
hive> describe extended t_udaf;
OK
namestring
ageint
Detailed Table InformationTable(tableName:t_udaf, dbName:test, owner:root, createTime:1418808574, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:name, type:string, comment:null), FieldSchema(name:age, type:int, comment:null)], location:hdfs://lntusl:9000/usr/hive/warehouse/test.db/t_udaf, inputFormat:org.apache.hadoop.mapred.TextInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, parameters:{serialization.format=/t, field.delim=/t}), bucketCols:[], sortCols:[], parameters:{}), partitionKeys:[], parameters:{numPartitions=0, numFiles=1, transient_lastDdlTime=1418808709, numRows=0, totalSize=135, rawDataSize=0}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE)
Time taken: 0.111 seconds
但是,执行如下语句,报错:
hive> select age,myconcat(name,',') from t_udaf group by age;
FAILED: Hive Internal Error: java.lang.NullPointerException(null)
java.lang.NullPointerException
at org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils$ConversionHelper.<init>(GenericUDFUtils.java:215)
at org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBridge$GenericUDAFBridgeEvaluator.init(GenericUDAFBridge.java:129)
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.getGenericUDAFInfo(SemanticAnalyzer.java:2452)
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.genGroupByPlanGroupByOperator1(SemanticAnalyzer.java:2740)
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.genGroupByPlanMapAggr1MR(SemanticAnalyzer.java:3677)
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.genBodyPlan(SemanticAnalyzer.java:6125)
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.genPlan(SemanticAnalyzer.java:6762)
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:7531)
at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:243)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:431)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:336)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:909)
at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:258)
at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:215)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:406)
at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:689)
at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:557)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
求指点!。~。