微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用 Spark 累加器通过 SparkListener 检测 Pyspark 作业?

如何解决如何使用 Spark 累加器通过 SparkListener 检测 Pyspark 作业?

我正在尝试使用 pyspark 中的累加器来检测我的 pyspark 作业中的 udf 或自定义 spark 方法。我在 java/scala 中编写了一个自定义 SparkListener 来监听 onStageComplete 的累加器值。这是我拥有的 SparkListener

    public class MyListener extends SparkListener {
    @Override
    public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
        final Map<Object,AccumulableInfo> accumulableInfoMap = JavaConverters.
                mapAsJavaMapConverter(stageCompleted.stageInfo().accumulables())
                .asJava();
        accumulableInfoMap.forEach((k,accInfo) -> {
                    final String name = accInfo.name().getorElse(null);
                    final String nonNullName = Optional.ofNullable(name).orElse("");

                    final Optional<Double> accumResult = Optional.ofNullable(accInfo.value()
                            .getorElse(null))
                            .map(value -> {
                                Double val;
                                try {
                                    val = Double.valueOf(value.toString());
                                } catch (Exception e) {
                                    val = null;
                                }
                                return val;
                            });
                    System.out.println("Printing Accumulator");
                    System.out.println("name:" + name + " value:" + accumResult.orElse((0.0d)));                    }
        );
    }
}

当您使用 longAccumulator 编写 Scala 代码和检测时,这很有效。然而,命名累加器似乎还没有进入 pyspark yet

当我在使用以下命令 pyspark --driver-class-path MyJar-1.0.jar --conf spark.extraListeners=package.subpackage.MyListener 启动的 pyspark shell 中使用 pyspark 累加器时,sparklistener 不会拾取 pyspark 累加器。 MyJar-1.0.jar 包含我已经实现的 MyListener 类。

我在我的 pyspark shell 中使用以下测试代码

def filter_non_42(item,accumulator):
    if item % 2 == 0:
        accumulator += 1
    return '42' in str(item)

from functools import partial

accumulator = sc.accumulator(0)
counting_filter = partial(filter_non_42,accumulator=accumulator)

sc.range(0,10000).filter(counting_filter).sum()

我从我的 sparkListener 得到的输出如下

Printing Accumulator
name:internal.metrics.executorDeserializeTime value:794.0
Printing Accumulator
name:internal.metrics.executorcpuTime value:1.63990621E8
Printing Accumulator
name:internal.metrics.executorRunTime value:1066.0
Printing Accumulator
name:internal.metrics.jvmGCTime value:182.0
Printing Accumulator
name:internal.metrics.diskBytesspilled value:0.0
Printing Accumulator
name:internal.metrics.memoryBytesspilled value:0.0
Printing Accumulator
name:internal.metrics.executorDeserializecpuTime value:3.01491783E8
Printing Accumulator
name:internal.metrics.resultSize value:2928.0
49995000                

如上所示,它打印了所有原生火花累加器,但没有打印我的自定义累加器。我已经尝试过 pyspark SparkContext 代码(context.py),据我所知,pyspark 中的累加器独立于 scala spark 中的累加器。

为了解决这个问题,我尝试从 pyspark 获取 java spark 上下文对象,并按如下方式获取 java long accumulator。

acc = sc._jsc.sc().longAccumulator("MyAccomulator")

上面的方法效果很好,但是当在上面的 filter_non_42 函数中类似地使用时,pyspark 在尝试腌制这个累加器时遇到了问题。

我的问题是

  1. 在使用 pyspark 累加器时我是否遗漏了某些东西,有没有办法让它们出现在 sparklistener 中?
  2. 如果不能,我可以如何腌制基本上是上述方法中的 py4j JavaObject 的 scala spark 对象?

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。