微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在执行器上动态创建累加器

如何解决在执行器上动态创建累加器

我想使用累加器来计算我的 RDD 中对象的几个参数的组合。 例如,我的 RDD 为 Obj,字段为 ab。这两个字段都是枚举,可能具有少数值之一。 为了实现它,我应该在驱动程序上创建累加器并在工人上使用它:

val acc1 = sc.longAccumulator("a1-b1")
val acc2 = sc.longAccumulator("a2-b1")
val acc3 = sc.longAccumulator("a1-b2")
...

我不想在所有具有相同逻辑的火花作业中为所有值组合声明大量计数器。 是否有任何机制允许在执行器上动态创建累加器或以其他方式解决此问题?

搜索类似的东西:

rdd.foreach{ getAccumulator("${obj.a} - ${obj.b}").add(1) }

解决方法

从字面上回答你的问题,你不能在执行器上动态注册新的累加器。在作业实际开始之前,必须在驱动程序 (sparkContext.accumulator()) 上计划累加器。这就是 Spark 中累加器的设计方式。

但考虑到您实际想要实现的内容,您可能会得出结论,只需一个“静态”累加器即可实现相同的功能,该累加器收集 Map<String,Long> 条目而不是 Long

This 博客文章可能会更实际地理解我在这里的意思。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。