如何解决如何使用 Spark 累加器通过 SparkListener 检测 Pyspark 作业?
我正在尝试使用 pyspark 中的累加器来检测我的 pyspark 作业中的 udf 或自定义 spark 方法。我在 java/scala 中编写了一个自定义 SparkListener 来监听 onStageComplete 的累加器值。这是我拥有的 SparkListener
public class MyListener extends SparkListener {
@Override
public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
final Map<Object,AccumulableInfo> accumulableInfoMap = JavaConverters.
mapAsJavaMapConverter(stageCompleted.stageInfo().accumulables())
.asJava();
accumulableInfoMap.forEach((k,accInfo) -> {
final String name = accInfo.name().getorElse(null);
final String nonNullName = Optional.ofNullable(name).orElse("");
final Optional<Double> accumResult = Optional.ofNullable(accInfo.value()
.getorElse(null))
.map(value -> {
Double val;
try {
val = Double.valueOf(value.toString());
} catch (Exception e) {
val = null;
}
return val;
});
System.out.println("Printing Accumulator");
System.out.println("name:" + name + " value:" + accumResult.orElse((0.0d))); }
);
}
}
当您使用 longAccumulator 编写 Scala 代码和检测时,这很有效。然而,命名累加器似乎还没有进入 pyspark yet。
当我在使用以下命令 pyspark --driver-class-path MyJar-1.0.jar --conf spark.extraListeners=package.subpackage.MyListener
启动的 pyspark shell 中使用 pyspark 累加器时,sparklistener 不会拾取 pyspark 累加器。 MyJar-1.0.jar 包含我已经实现的 MyListener 类。
我在我的 pyspark shell 中使用以下测试代码。
def filter_non_42(item,accumulator):
if item % 2 == 0:
accumulator += 1
return '42' in str(item)
from functools import partial
accumulator = sc.accumulator(0)
counting_filter = partial(filter_non_42,accumulator=accumulator)
sc.range(0,10000).filter(counting_filter).sum()
我从我的 sparkListener 得到的输出如下
Printing Accumulator
name:internal.metrics.executorDeserializeTime value:794.0
Printing Accumulator
name:internal.metrics.executorcpuTime value:1.63990621E8
Printing Accumulator
name:internal.metrics.executorRunTime value:1066.0
Printing Accumulator
name:internal.metrics.jvmGCTime value:182.0
Printing Accumulator
name:internal.metrics.diskBytesspilled value:0.0
Printing Accumulator
name:internal.metrics.memoryBytesspilled value:0.0
Printing Accumulator
name:internal.metrics.executorDeserializecpuTime value:3.01491783E8
Printing Accumulator
name:internal.metrics.resultSize value:2928.0
49995000
如上所示,它打印了所有原生火花累加器,但没有打印我的自定义累加器。我已经尝试过 pyspark SparkContext 代码(context.py),据我所知,pyspark 中的累加器独立于 scala spark 中的累加器。
为了解决这个问题,我尝试从 pyspark 获取 java spark 上下文对象,并按如下方式获取 java long accumulator。
acc = sc._jsc.sc().longAccumulator("MyAccomulator")
上面的方法效果很好,但是当在上面的 filter_non_42 函数中类似地使用时,pyspark 在尝试腌制这个累加器时遇到了问题。
我的问题是
- 在使用 pyspark 累加器时我是否遗漏了某些东西,有没有办法让它们出现在 sparklistener 中?
- 如果不能,我可以如何腌制基本上是上述方法中的 py4j JavaObject 的 scala spark 对象?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。