我非常满意Spark 2.0 DataSet,因为它的编译时类型安全.但是这里有几个我无法解决的问题,我也没有找到好的文档.
问题#1 – 在聚合列上划分操作 –
考虑下面的代码 –
我有一个DataSet [MyCaseClass],我想在c1,c2,c3和sum(c4)/ 8上进行groupByKey.如果我只是计算总和但它给了divide(8)的编译时错误,下面的代码效果很好.我想知道如何实现以下目标.
final case class MyClass (c1: String,c2: String,c3: String,c4: Double) val myCaseClass: DataSet[MyCaseClass] = ??? // assume it's being loaded import sparkSession.implicits._ import org.apache.spark.sql.expressions.scalalang.typed.{sum => typedSum} myCaseClass. groupByKey(myCaseClass => (myCaseClass.c1,myCaseClass.c2,myCaseClass.c3)). agg(typedSum[MyCaseClass](_.c4).name("sum(c4)"). divide(8)). //this is breaking with exception show()
如果我删除.divide(8)操作并运行上面的命令它会给我低于输出.
+-----------+-------------+ | key|sum(c4) | +-----------+-------------+ | [A1,F2,S1]| 80.0| | [A1,F1,S1]| 40.0| +-----------+-------------+
问题#2 – 将groupedByKey结果转换为另一个Typed DataFrame –
现在问题的第二部分是我想再次输出一个类型化的DataSet.为此,我有另一个案例类(不确定是否需要),但我不知道如何映射分组结果 –
final case class AnotherClass(c1: String,average: Double) myCaseClass. groupByKey(myCaseClass => (myCaseClass.c1,myCaseClass.c3)). agg(typedSum[MyCaseClass](_.c4).name("sum(c4)")). as[AnotherClass] //this is breaking with exception
但是这又失败了,因为按键分组的结果并没有直接映射到AnotherClass.
PS:上述任何其他解决方案都非常受欢迎.
解决方法
第一个问题可以通过一直使用类型列来解决(keyvalueGroupedDataset.agg需要TypedColumn(-s))
您可以将聚合结果定义为:
您可以将聚合结果定义为:
val eight = lit(8.0) .as[Double] // Not necessary val sumByEight = typedSum[MyClass](_.c4) .divide(eight) .as[Double] // required .name("div(sum(c4),8)")
并将其插入以下代码:
val myCaseClass = Seq( MyClass("a","b","c",2.0),MyClass("a",3.0) ).toDS myCaseClass .groupByKey(myCaseClass => (myCaseClass.c1,myCaseClass.c3)) .agg(sumByEight)
要得到
+-------+---------------+ | key|div(sum(c4),8)| +-------+---------------+ |[a,b,c]| 0.625| +-------+---------------+
第二个问题是使用不符合数据形状的类的结果.正确的表示可能是:
case class AnotherClass(key: (String,String,String),sum: Double)
与上面定义的数据一起使用:
myCaseClass .groupByKey(myCaseClass => (myCaseClass.c1,myCaseClass.c3)) .agg(typedSum[MyClass](_.c4).name("sum")) .as[AnotherClass]
会给:
+-------+---+ | key|sum| +-------+---+ |[a,c]|5.0| +-------+---+
但是如果数据集[((String,Double)]可以接受,那么[AnotherClass]就没有必要了.
您当然可以跳过所有这些并且只是mapGroups(尽管不会没有性能损失):
import shapeless.Syntax.std.tuple._ // A little bit of shapeless val tuples = myCaseClass .groupByKey(myCaseClass => (myCaseClass.c1,myCaseClass.c3)) .mapGroups((group,iter) => group :+ iter.map(_.c4).sum)
结果
+---+---+---+---+ | _1| _2| _3| _4| +---+---+---+---+ | a| b| c|5.0| +---+---+---+---+
reduceGroups可能是更好的选择:
myCaseClass .groupByKey(myCaseClass => (myCaseClass.c1,myCaseClass.c3)) .reduceGroups((x,y) => x.copy(c4=x.c4 + y.c4))
结果数据集:
+-------+-----------+ | _1| _2| +-------+-----------+ |[a,c]|[a,c,5.0]| +-------+-----------+
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。