根据我的具体要求,我想编写一个UDAF,它只收集所有输入行.
输入是两列行,Double Type;
中间模式,“我想”,是ArrayList(如果我错了,请纠正我)
返回的数据类型是ArrayList
我写了一篇关于我的UDAF的“想法”,但我希望有人帮助我完成它.
class CollectorUDAF() extends UserDefinedAggregateFunction { // Input Data Type Schema def inputSchema: StructType = StructType(Array(StructField("value",DoubleType),StructField("y",DoubleType))) // Intermediate Schema def bufferSchema = util.ArrayList[Array(StructField("value",DoubleType)] // Returned Data Type . def dataType: DataType = util.ArrayList[Array(StructField("value",DoubleType)] // Self-explaining def deterministic = true // This function is called whenever key changes def initialize(buffer: MutableAggregationBuffer) = { } // Iterate over each entry of a group def update(buffer: MutableAggregationBuffer,input: Row) = { } // Called after all the entries are exhausted. def evaluate(buffer: Row) = { } def merge(buffer1: MutableAggregationBuffer,buffer2: Row): Unit = { } }
解决方法
如果我理解你的问题是正确的,那么以下是你的解决方案:
class CollectorUDAF() extends UserDefinedAggregateFunction { // Input Data Type Schema def inputSchema: StructType = new StructType().add("value",DataTypes.DoubleType).add("y",DataTypes.DoubleType) // Intermediate Schema val bufferFields : util.ArrayList[StructField] = new util.ArrayList[StructField] val bufferStructField : StructField = DataTypes.createStructField("array",DataTypes.createArrayType(DataTypes.StringType,true),true) bufferFields.add(bufferStructField) def bufferSchema: StructType = DataTypes.createStructType(bufferFields) // Returned Data Type . def dataType: DataType = DataTypes.createArrayType(DataTypes.DoubleType) // Self-explaining def deterministic = true // This function is called whenever key changes def initialize(buffer: MutableAggregationBuffer) = { buffer(0,new java.util.ArrayList[Double]) } // Iterate over each entry of a group def update(buffer: MutableAggregationBuffer,input: Row) = { val DoubleList: util.ArrayList[Double] = new util.ArrayList[Double](buffer.getList(0)) DoubleList.add(input.getDouble(0)) DoubleList.add(input.getDouble(1)) buffer.update(0,DoubleList) } def merge(buffer1: MutableAggregationBuffer,buffer2: Row): Unit = { buffer1.update(0,buffer1.getList(0).toArray() ++ buffer2.getList(0).toArray()) } // Called after all the entries are exhausted. def evaluate(buffer: Row) = { buffer.getList(0).toArray() } }
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。