如何解决Spark - 将通用数组传递给 GenericRowWithSchema
我正在尝试构建一个 Spark 实用程序,该实用程序从 HBase 表读取数据并写入 Hive 表。这里的先决条件是所有列都是字符串类型。
我的方法是在 RDD[(ImmutableBytesWritable,Result)]
中读取 Hbase 表,然后将其转换为 RDD[GenericRowWithSchema]
,然后转换为 Spark 数据帧。我对第一部分很好,但在第二部分面临问题。我的代码看起来像这样
def parseRow(result: Result,hiveColumns: Array[String],tableSchema: StructType): = {
val rowKey = Bytes.toString(result.getRow)
val cfDataBytes = Bytes.toBytes("cf")
val colArray = hiveColumns.map(col => Bytes.toString(result.getValue(cfDataBytes,Bytes.toBytes(col))))
new GenericRowWithSchema(colArray,tableSchema)
}
我在声明 parseRow
函数时出错
error:Type mismatch
found: Array[String]
required: Array[Any]
Note: String <: Any,but class Array is invariant in type T
You may wish to investigate a wildcard type such as `_<:Any`. (SLS 3.2.10)
new GenericRowWithSchema(colArray,tableSchema)
^
我使用的功能如下
val tableName = "xyz" // This will be an arg that is passed to the job
val hiveColumns: Array[String] = spark.table(tableName)
val tableSchema: StructType = StructType(hiveColumns.map(colName => StructField(colName,DataTypes.StringType,false)))
val hbaseConf = HBaseConfiguration.create
val scan: Scan = new Scan
hbaseConf.set(TableInputFormat.INPUT_TABLE,"hbaseSchema:hbaseTable")
hbaseConf.set(TableInputFormat.SCAN,TableMapReduceUtil.converScanToString(scan))
val hbaseRDD: RDD[(ImmutableBytesWritable,Result)] = sc.newAPIHadoopRDD(hbaseConfig,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])
import spark.implicits._
val resultRDD: RDD[Result] = hbaseRDD.map(tuple => tuple._2)
val finalRDD: RDD[GenericRowWithSchema] = resultRDD.map(result => parseRow(result,hiveColumns,tableSchema))
如何将 Array[String] 的元素包裹在 Array[Any] 中,以便构造函数可以接受?
解决方法
我们只需要将 Array[String]
扩大到 Array[Any]
如下
new GenericRowWithSchema(colArray.toArray,tableSchema)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。