微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在 Spark 中,原始 Java 代码究竟在哪里执行?

如何解决在 Spark 中,原始 Java 代码究竟在哪里执行?

我是 Spark 的新手,我知道 Spark 通常会序列化函数并将其发送给所有执行程序并处理 HDFS 中可用的数据块。但是如果我有以下代码

Random random = new Random(); //statement A
int randomValue = random.nextInt(); //statement B
JavaPairRDD<String,Integer> pairRDD = mapRandom.mapToPair(s -> {
    return new Tuple2<>(s,randomValue);
});

语句 A,B 到底在哪里运行?当然,它不会在每个执行器中执行它,因为每个执行器都运行自己的 JVM,我发现每个值都映射到完全相同的 randomValue

这是否意味着这些语句 A,B 在 Driver 中运行并被复制到所有执行程序?

解决方法

A 和 B 在驱动程序上运行并发送到执行程序。

如果你想在执行器中运行它,你应该这样做:

JavaPairRDD<String,Integer> pairRDD = mapRandom.mapToPair(s -> {
    Random random = new Random(); //statement A
    int randomValue = random.nextInt(); //statement B
    return new Tuple2<>(s,randomValue);
});
,

RDD 是分布式的(因此得名)但它是在驱动程序代码中定义的,所以不与 RDD 内容交互的是在驱动程序上执行的代码(根据经验),而任何具有与 RDD 内容有关的内容将在执行程序上运行。

如您所见,驱动程序计算您的 randomValue 一次并将其发送给所有执行程序,因为 mapToPair lambda 参数关闭了该计算值。

在 lambda 中移动(仅)random.nextInt() 调用将触发为分布式集合中的每个元素执行该调用。此外,random 值本身将被序列化并通过网络发送。

将随机创建本身移动到 lambda 中会使其更小(无需捕获外部状态),但会为分布式集合中的每个元素创建一个新的 Random 实例,这显然是次优的。

每个执行器有一个随机实例值,您可以将其设置为每个 JVM/执行器初始化一次的静态成员(或惰性单例)。要为每个(例如)分区设置不同的随机值,您可能应该使用 forEachPartition 或类似方法,使用 nextInt() 生成一个新值,并将该值用于该分区中的所有元素。

查看更高级别的 DataFrame/SQL API,因为您可能会发现实现您想要的东西要容易得多,而不必担心代码在哪里执行。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。