如何解决如何使用尽可能少的内存来运行内存密集型火花流作业
我正在运行一项作业,该作业使用一个值将两个 kafka 主题合并为一个主题。我使用的环境只允许我为每个作业分配少于 10g 的内存,我尝试加入的数据每个主题大约有 500k 条记录。
我对 Spark 还很陌生,所以我想知道是否有办法最大限度地减少内存消耗
代码:
val df_person: DataFrame = PERSONinformatION_df
.select(from_json(expr("cast(value as string) as actualValue"),schemaPERSONinformatION).as("s")).select("s.*").withColumn("comsume_date",lit(LocalDateTime.Now.format(DateTimeFormatter.ofPattern("HH:mm:ss.SS")))).as("dfperson")
val df_candidate: DataFrame = CANDIDATEinformatION_df
.select(from_json(expr("cast(value as string) as actualValue"),schemaCANDIDATEinformatION).as("s")).select("s.*").withColumn("comsume_date",lit(LocalDateTime.Now.format(DateTimeFormatter.ofPattern("HH:mm:ss.SS")))).as("dfcandidate")
加入主题:
val joined_df : DataFrame = df_candidate.join(df_person,col("dfcandidate.PERSONID") === col("dfperson.ID"),"inner").withColumn("join_date",lit(LocalDateTime.Now.format(DateTimeFormatter.ofPattern("HH:mm:ss.SS"))))
重构数据
val string2json: DataFrame = joined_df.select($"dfcandidate.ID".as("key"),to_json(struct($"dfcandidate.ID".as("candidateID"),$"FULLNAME",$"PERSONALID",$"join_date",$"dfcandidate.PERSONID".as("personID"),$"dfcandidate.comsume_date".as("candidate_comsume_time"),$"dfperson.comsume_date".as("person_comsume_time"))).cast("String").as("value"))
将它们写入主题
string2json.writeStream.format("kafka")
.option("kafka.bootstrap.servers","xxx:9092")
.option("topic","mergedinfo")
.option("checkpointLocation","/tmp/producer/checkpoints")
.option("failOnDataLoss",false)
.start()
.awaitTermination()
运行命令:
spark-submit --class taqasi_spark.App --master yarn ./spark_poc-test_memory.jar --executor-memory 10g --driver-memory 10g --executor-memory 10g --deploy-mode cluster
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。