微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用尽可能少的内存来运行内存密集型火花流作业

如何解决如何使用尽可能少的内存来运行内存密集型火花流作业

我正在运行一项作业,该作业使用一个值将两个 kafka 主题合并为一个主题。我使用的环境只允许我为每个作业分配少于 10g 的内存,我尝试加入的数据每个主题大约有 500k 条记录。

我对 Spark 还很陌生,所以我想知道是否有办法最大限度地减少内存消耗

代码

val df_person: DataFrame = PERSONinformatION_df
      .select(from_json(expr("cast(value as string) as actualValue"),schemaPERSONinformatION).as("s")).select("s.*").withColumn("comsume_date",lit(LocalDateTime.Now.format(DateTimeFormatter.ofPattern("HH:mm:ss.SS")))).as("dfperson")

val df_candidate: DataFrame = CANDIDATEinformatION_df
      .select(from_json(expr("cast(value as string) as actualValue"),schemaCANDIDATEinformatION).as("s")).select("s.*").withColumn("comsume_date",lit(LocalDateTime.Now.format(DateTimeFormatter.ofPattern("HH:mm:ss.SS")))).as("dfcandidate")

加入主题

val joined_df : DataFrame = df_candidate.join(df_person,col("dfcandidate.PERSONID") === col("dfperson.ID"),"inner").withColumn("join_date",lit(LocalDateTime.Now.format(DateTimeFormatter.ofPattern("HH:mm:ss.SS"))))

重构数据

val string2json: DataFrame = joined_df.select($"dfcandidate.ID".as("key"),to_json(struct($"dfcandidate.ID".as("candidateID"),$"FULLNAME",$"PERSONALID",$"join_date",$"dfcandidate.PERSONID".as("personID"),$"dfcandidate.comsume_date".as("candidate_comsume_time"),$"dfperson.comsume_date".as("person_comsume_time"))).cast("String").as("value"))

将它们写入主题

string2json.writeStream.format("kafka")
      .option("kafka.bootstrap.servers","xxx:9092")
      .option("topic","mergedinfo")
      .option("checkpointLocation","/tmp/producer/checkpoints")
      .option("failOnDataLoss",false)
      .start()
      .awaitTermination()

运行命令:

spark-submit --class taqasi_spark.App --master yarn ./spark_poc-test_memory.jar --executor-memory 10g --driver-memory 10g --executor-memory 10g  --deploy-mode cluster

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。