微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何在不将数字格式更改为科学计数法或精度的情况下将大双数加载到PySpark DataFrame中并保留回去?

如何解决如何在不将数字格式更改为科学计数法或精度的情况下将大双数加载到PySpark DataFrame中并保留回去?

我有这样的CSV

import random
min = 1
max = 100
skill = 60
roll = (random.randint(min,max))
print ("Rolling the dices...")
print (roll)
hard_skill = skill/2
extreme_success = skill * .25

if roll > skill:
    print ("fail")
elif(roll <= hard_skill+1):
    print ("regular success")
elif(roll <= extreme_success):
    print("Extreme sucess")

由于项目的其他要求,我想加载具有列COL,VAL TEST,100000000.12345679 TEST2,200000000.1234 TEST3,9999.1234679123 作为数字类型的数据,然后按照以下结构将其持久化为另一个CSV:

VAL

我面临的问题是,每当我加载它时,数字就会变成科学计数法,并且我无法将其持久化而不必通知我的数据+-----+------------------+ | COL| VAL| +-----+------------------+ | TEST|100000000.12345679| |TEST2| 200000000.1234| |TEST3| 9999.1234679123| +-----+------------------+ precision(我想使用它已经存在于文件中的那个文件,无论它是什么-我无法推断出它)。 这是我尝试过的:

使用scale加载它会给我科学的记号:

DoubleType()

使用schema = StructType([ StructField('COL',StringType()),StructField('VAL',DoubleType()) ]) csv_file = "Downloads/test.csv" df2 = (spark.read.format("csv") .option("sep",",") .option("header","true") .schema(schema) .load(csv_file)) df2.show() +-----+--------------------+ | COL| VAL| +-----+--------------------+ | TEST|1.0000000012345679E8| |TEST2| 2.000000001234E8| |TEST3| 9999.1234679123| +-----+--------------------+ 加载它,我必须指定DecimalType()precision,否则,我会丢失点后的小数。但是,指定它后,除了可能得不到正确的值(因为我的数据可能会四舍五入)外,在点后我得到零: 例如,使用:scale我得到:

StructField('VAL',DecimalType(38,18))

意识到在这种情况下,我的新文件右边不需要零。

我发现解决此问题的唯一方法是使用[Row(COL='TEST',VAL=Decimal('100000000.123456790000000000')),Row(COL='TEST2',VAL=Decimal('200000000.123400000000000000')),Row(COL='TEST3',VAL=Decimal('9999.123467912300000000'))] ,其中我先使用UDF删除科学记号,然后将其转换为字符串以确保它能像我一样持久保存想要

float()

有没有不使用to_decimal = udf(lambda n: str(float(n))) df2 = df2.select("*",to_decimal("VAL").alias("VAL2")) df2 = df2.select(["COL","VAL2"]).withColumnRenamed("VAL2","VAL") df2.show() display(df2.schema) +-----+------------------+ | COL| VAL| +-----+------------------+ | TEST|100000000.12345679| |TEST2| 200000000.1234| |TEST3| 9999.1234679123| +-----+------------------+ StructType(List(StructField(COL,StringType,true),StructField(VAL,true))) 技巧的方法

谢谢!

解决方法

我发现解决这个问题的最好方法就是吼叫。它仍在使用UDF,但是现在,没有使用String的变通办法来避免科学计数法。我不会将其作为正确的答案,因为我仍然希望有人提出没有UDF的解决方案(或者很好地解释为什么没有UDF是不可能的)。

  1. CSV:
$ cat /Users/bambrozi/Downloads/testf.csv
COL,VAL
TEST,100000000.12345679
TEST2,200000000.1234
TEST3,9999.1234679123
TEST4,123456789.01234567
  1. 应用默认的PySpark DecimalType精度和比例加载CSV:
schema = StructType([
    StructField('COL',StringType()),StructField('VAL',DecimalType(38,18))
])

csv_file = "Downloads/testf.csv"
df2 = (spark.read.format("csv")
        .option("sep",",")
        .option("header","true")
        .schema(schema)
        .load(csv_file))

df2.show(truncate=False)

输出:

+-----+----------------------------+
|COL  |VAL                         |
+-----+----------------------------+
|TEST |100000000.123456790000000000|
|TEST2|200000000.123400000000000000|
|TEST3|9999.123467912300000000     |
|TEST4|123456789.012345670000000000|
+-----+----------------------------+
  1. 当您准备对其进行报告(打印或保存为新文件)时,可以将格式应用于尾随的零:
import decimal
import pyspark.sql.functions as F
normalize_decimals = F.udf(lambda dec: dec.normalize())
(df2
    .withColumn('VAL',normalize_decimals(F.col('VAL')))
    .show(truncate=False))

输出:

+-----+------------------+
|COL  |VAL               |
+-----+------------------+
|TEST |100000000.12345679|
|TEST2|200000000.1234    |
|TEST3|9999.1234679123   |
|TEST4|123456789.01234567|
+-----+------------------+
,

您可以使用spark来执行sql查询:

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame,SparkSession}

val sparkConf: SparkConf = new SparkConf(true)
    .setAppName(this.getClass.getName)
    .setMaster("local[*]")

implicit val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

val df = spark.read.option("header","true").format("csv").load(csv_file)
df.createOrReplaceTempView("table")

val query = "Select cast(VAL as BigDecimal) as VAL,COL from table"
val result = spark.sql(query)
result.show()
result.coalesce(1).write.option("header","true").mode("overwrite").csv(outputPath + table)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。