微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark 多阶段 mapWithState Shuffle 问题

如何解决Spark 多阶段 mapWithState Shuffle 问题

我们有一个有状态的多阶段 DStream 应用程序,其中每个阶段都映射到不同的 Key。 在第一阶段之后,我们正在经历巨大的 Shuffle 读取和写入,在大约 20 个窗口左右之后,这种读取和写入持续增长并变得巨大。

    val stageOnePartitionCount = 1000
    val stageTwoPartitionCount = 1000
    val stageThreePartitionCount = 1000

    // Very little Shuffle for this stage consistent across all windows 
    val stageOneDataSet = dataSet
     .map(data => (data.key1,data))
     .reduceByKey(...,stageOnePartitionCount)
     .mapWithState(
       StateSpec
        .function(...)
        .initialState(...) // uses key1
        .numPartitions(stageOnePartitionCount)
     )

    // Shuffle for this stage grows every window and becomes huge after about 20 windows or so..  
    val stageTwoDataSet = stageOneDataSet
     .map(data => (data.key2,stageTwoPartitionCount)
     .mapWithState(
       StateSpec
        .function(...)
        .initialState(...) // uses key2
        .numPartitions(stageTwoPartitionCount)
     )

    // Shuffle for this stage grows every window and becomes huge after about 20 windows or so..  
    val stageThreeDataSet = stageTwoDataSet
     .map(data => (data.key3,stageThreePartitionCount)
     .mapWithState(
       StateSpec
        .function(...)
        .initialState(...) // uses key3
        .numPartitions(stageThreePartitionCount)
     )

    stageThreeDataSet
     .foreachRDD(...)

我们在每个阶段的每个 reduceByKey 操作之前执行 mapWithState(合并部分更新)操作,其中分区计数在它们之间匹配,目的是在 {{ 1}} 函数传入的流数据将已经在它的状态所在的节点。

那么是什么导致第二和第三阶段的 Shuffle 在每个经过的窗口中呈指数级上升,但第一阶段仍然保持高性能,所有窗口的 Shuffle 都很少?

注意:查看第二阶段和第三阶段被混洗(读/写)的数据量,我想知道是否有状态 RDD(包含状态数据)被混洗以共同定位它们传入流的大小将比 20 个窗口左右后的累积状态小得多。如果是这种情况,我们如何解决相同的问题。

感谢大家的帮助!!

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。