如何解决在 Spark 中,原始 Java 代码究竟在哪里执行?
我是 Spark 的新手,我知道 Spark 通常会序列化函数并将其发送给所有执行程序并处理 HDFS 中可用的数据块。但是如果我有以下代码,
Random random = new Random(); //statement A
int randomValue = random.nextInt(); //statement B
JavaPairRDD<String,Integer> pairRDD = mapRandom.mapToPair(s -> {
return new Tuple2<>(s,randomValue);
});
语句 A
,B
到底在哪里运行?当然,它不会在每个执行器中执行它,因为每个执行器都运行自己的 JVM,我发现每个值都映射到完全相同的 randomValue
。
这是否意味着这些语句 A
,B
在 Driver 中运行并被复制到所有执行程序?
解决方法
A 和 B 在驱动程序上运行并发送到执行程序。
如果你想在执行器中运行它,你应该这样做:
JavaPairRDD<String,Integer> pairRDD = mapRandom.mapToPair(s -> {
Random random = new Random(); //statement A
int randomValue = random.nextInt(); //statement B
return new Tuple2<>(s,randomValue);
});
,
RDD 是分布式的(因此得名)但它是在驱动程序代码中定义的,所以不与 RDD 内容交互的是在驱动程序上执行的代码(根据经验),而任何具有与 RDD 内容有关的内容将在执行程序上运行。
如您所见,驱动程序计算您的 randomValue
一次并将其发送给所有执行程序,因为 mapToPair
lambda 参数关闭了该计算值。
在 lambda 中移动(仅)random.nextInt()
调用将触发为分布式集合中的每个元素执行该调用。此外,random
值本身将被序列化并通过网络发送。
将随机创建本身移动到 lambda 中会使其更小(无需捕获外部状态),但会为分布式集合中的每个元素创建一个新的 Random
实例,这显然是次优的。
要每个执行器有一个随机实例值,您可以将其设置为每个 JVM/执行器初始化一次的静态成员(或惰性单例)。要为每个(例如)分区设置不同的随机值,您可能应该使用 forEachPartition
或类似方法,使用 nextInt()
生成一个新值,并将该值用于该分区中的所有元素。
查看更高级别的 DataFrame/SQL API,因为您可能会发现实现您想要的东西要容易得多,而不必担心代码在哪里执行。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。