如何解决Spark 多阶段 mapWithState Shuffle 问题
我们有一个有状态的多阶段 DStream 应用程序,其中每个阶段都映射到不同的 Key。 在第一阶段之后,我们正在经历巨大的 Shuffle 读取和写入,在大约 20 个窗口左右之后,这种读取和写入持续增长并变得巨大。
val stageOnePartitionCount = 1000
val stageTwoPartitionCount = 1000
val stageThreePartitionCount = 1000
// Very little Shuffle for this stage consistent across all windows
val stageOneDataSet = dataSet
.map(data => (data.key1,data))
.reduceByKey(...,stageOnePartitionCount)
.mapWithState(
StateSpec
.function(...)
.initialState(...) // uses key1
.numPartitions(stageOnePartitionCount)
)
// Shuffle for this stage grows every window and becomes huge after about 20 windows or so..
val stageTwoDataSet = stageOneDataSet
.map(data => (data.key2,stageTwoPartitionCount)
.mapWithState(
StateSpec
.function(...)
.initialState(...) // uses key2
.numPartitions(stageTwoPartitionCount)
)
// Shuffle for this stage grows every window and becomes huge after about 20 windows or so..
val stageThreeDataSet = stageTwoDataSet
.map(data => (data.key3,stageThreePartitionCount)
.mapWithState(
StateSpec
.function(...)
.initialState(...) // uses key3
.numPartitions(stageThreePartitionCount)
)
stageThreeDataSet
.foreachRDD(...)
我们在每个阶段的每个 reduceByKey
操作之前执行 mapWithState
(合并部分更新)操作,其中分区计数在它们之间匹配,目的是在 {{ 1}} 函数传入的流数据将已经在它的状态所在的节点。
那么是什么导致第二和第三阶段的 Shuffle 在每个经过的窗口中呈指数级上升,但第一阶段仍然保持高性能,所有窗口的 Shuffle 都很少?
注意:查看第二阶段和第三阶段被混洗(读/写)的数据量,我想知道是否有状态 RDD(包含状态数据)被混洗以共同定位它们传入流的大小将比 20 个窗口左右后的累积状态小得多。如果是这种情况,我们如何解决相同的问题。
感谢大家的帮助!!
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。