微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在 Spark-Scala 中将 Dataset[Row] 转换为 RDD[Array[String]] 的最佳方法?

如何解决在 Spark-Scala 中将 Dataset[Row] 转换为 RDD[Array[String]] 的最佳方法?

我正在通过读取 csv 文件创建一个 spark 数据集。此外,我需要将此 Dataset[Row] 转换为 RDD[Array[String]] 以将其传递给 FpGrowth(Spark MLLIB)。

val df: DataFrame = spark.read.format("csv").option("header","true").load("/path/to/csv")
val ds: Dataset[Row] = df.groupBy("user").agg(collect_set("values"))

现在,我需要选择列“values”并将结果数据集转换为RDD[Array[String]]

val rddS: RDD[String] = ds.select(concat_ws(",",col("values")).as("items")).distinct().rdd.map(_.mkString(","))
val rddArray: RDD[Array[String]] = rddS.map(s => s.trim.split(','))

我尝试了这种方法,但不确定它是否是最好的方法。请建议我实现这一目标的最佳方式。

解决方法

单线:

df = pd.DataFrame()
for i in unique(mbr_base['location']):
    rst = '''select * from where location = 'i'; '''
    rst_df = pd.to_dataframe(rst)
    pd.concat([df,rst_df],axis=0)

display(df)

顺便说一下,我建议使用基于数据帧的 Spark ML,而不是基于 RDD 的 Spark MLLib,后者现已弃用。您可以使用 val rddArray: RDD[Array[String]] = ds.select("values").as[Array[String]].rdd

,

我最终使用了 toSeq 方法

val rddArray: RDD[Array[String]] = ds.select("values").rdd.map(r => r.getSeq[String](0).toArray)

这对我的用例来说更有效(更快)。

,

为什么不简单地使用如下,您将减少concat_wssplit操作。

val rddS:RDD[Array[String]] = ds.select("values")
    .distinct()
    .rdd.map(r => r.getAs[mutable.WrappedArray[String]](0).toArray)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。