微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

有没有一种方法可以将值范围的列添加到Spark Dataframe中?

如何解决有没有一种方法可以将值范围的列添加到Spark Dataframe中?

我有一个spark数据帧:df1如下:

age = spark.createDataFrame(["10","11","13"],"string").toDF("age")
age.show()
+---+
|age|
+---+
| 10|
| 11|
| 13|
+---+

我需要在数据框中添加一个行号列以使其:

+---+------+
|age|col_id|
+---+------+
| 10|   1  |
| 11|   2  |
| 13|   3  |
+---+------+

我的数据框中的任何列都不包含唯一值。 我尝试使用F.monotonically_increasing_id()),但它只是按递增顺序生成随机数。

>>> age = spark.createDataFrame(["10","string").toDF("age").withColumn("rowId1",F.monotonically_increasing_id())
>>> age
DataFrame[age: string,rowId1: bigint]
>>> age.show
<bound method DataFrame.show of DataFrame[age: string,rowId1: bigint]>
>>> age.show()
+---+-----------+
|age|     rowId1|
+---+-----------+
| 10|17179869184|
| 11|42949672960|
| 13|60129542144|
+---+-----------+

由于我没有任何列包含唯一数据,因此我担心使用窗口功能生成row_numbers。 因此,有没有一种方法可以将包含row_count的列添加到提供以下内容的数据框中:

+---+------+
|age|col_id|
+---+------+
| 10|   1  |
| 11|   2  |
| 13|   3  |
+---+------+

如果开窗功能是唯一的实现方式,那么如何确保所有数据都位于单个分区下? 或者如果有一种无需使用窗口函数即可实现的方法,该如何实现呢? 任何帮助表示赞赏。

解决方法

使用zipWithIndex

由于我忙于处理问题,昨天找不到过去做过的代码,但这是一篇很好的文章,对它进行了解释。 https://sqlandhadoop.com/pyspark-zipwithindex-example/

pyspark与Scala不同。

其他对性能不好的答案-转到单个执行程序。 zipWithIndexnarrow transformation,因此它可以按partition使用。

在这里,您可以相应地进行剪裁:

from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
from pyspark.sql.types import StringType,LongType
import pyspark.sql.functions as F

df1 = spark.createDataFrame([ ('abc'),('2'),('3'),('4'),('abc'),('4') ],StringType())

schema = StructType(df1.schema.fields[:] + [StructField("index",LongType(),True)])
rdd = df1.rdd.zipWithIndex()
rdd1 = rdd.map(lambda row: tuple(row[0].asDict()[c] for c in schema.fieldNames()[:-1]) + (row[1],))
df1 = spark.createDataFrame(rdd1,schema)
df1.show()

返回:

+-----+-----+
|value|index|
+-----+-----+
|  abc|    0|
|    2|    1|
|    3|    2|
|    4|    3|
|  abc|    4|
|    2|    5|
|    3|    6|
|    4|    7|
|  abc|    8|
|    2|    9|
|    3|   10|
|    4|   11|
+-----+-----+
,

假设:此答案基于col_id的顺序应取决于age列的假设。如果假设不成立,则其他建议的解决方案是zipWithIndex中提到的问题。在this answer中可以找到zipWithIndex的用法示例。

建议的解决方案: 您可以将window与空的partitionBy和行号一起使用以获取期望的数字。

from pyspark.sql.window import Window
from pyspark.sql import functions as F

windowSpec = Window.partitionBy().orderBy(F.col('age').asc())
age = age.withColumn(
    'col_id',F.row_number().over(windowSpec)
)

[EDIT]添加需求假设并引用替代解决方案。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。