如何解决使用 map
我正在尝试通过 spark 将数据推送到 phoenix。我正在尝试创建要复制到 hdfs 的 HFile,以便 phoenix 可以直接读取。
val pdd = colSortedDf.map(row => {
(row(1).toString,(row(0).toString,row(2).toString,row(3).toString,row(4).toString,row(5).toString))
})
val tdd = pdd.flatMap(x => {
val rowKey=PChar.INSTANCE.toBytes(x._1)
for(i <- 0 until valCols.length) yield {
val colName = valCols(i).toString
val colValue: String = x._2.productElement(i).toString
val colFam = "CF1"
println((rowKey,(colFam,colName,colValue)))
(rowKey,colValue))
}
})
到这里程序运行良好,但下面的代码片段抛出错误。
val output = tdd.map(x => {
val rowKey: Array[Byte] = x._1
val immutableRowKey = new ImmutableBytesWritable(rowKey)
val colFam = x._2._1
val colName = x._2._2
val colValue = x._2._3
val kv = new KeyValue(rowKey,colFam.getBytes(),colName.getBytes(),colValue.getBytes()
)
(immutableRowKey,kv)
})
我正在尝试运行这是 spark-shell。当我在新的 spark-shell 中运行它时,我得到以下错误日志
scala.reflect.internal.Symbols$CyclicReference: illegal cyclic reference involving object InterfaceAudience
at scala.reflect.internal.Symbols$Symbol$$anonfun$info$3.apply(Symbols.scala:1502)
at scala.reflect.internal.Symbols$Symbol$$anonfun$info$3.apply(Symbols.scala:1500)
第二次我运行相同的代码时,它给出了不同的错误
scala.reflect.internal.Symbols$CyclicReference: illegal cyclic reference involving object InterfaceAudience
at scala.reflect.internal.Symbols$Symbol$$anonfun$info$3.apply(Symbols.scala:1502)
at scala.reflect.internal.Symbols$Symbol$$anonfun$info$3.apply(Symbols.scala:1500)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
当我尝试使用 take(10) 时,它可以工作,但它返回一个数组而不是数据集。
val output = tdd.take(3).map(x => {
val rowKey: Array[Byte] = x._1
val immutableRowKey = new ImmutableBytesWritable(rowKey)
val colFam = x._2._1
val colName = x._2._2
val colValue = x._2._3
val kv = new KeyValue(rowKey,kv)
})
我无法弄清楚出了什么问题,简单的地图不起作用并且可以与 take 一起使用。任何帮助都会非常有帮助。谢谢。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。