微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

仅使用Spark-SQL API时广播变量的使用 1广播变量如何在内部工作? 2是否可以在Spark-SQL中利用此机制?

如何解决仅使用Spark-SQL API时广播变量的使用 1广播变量如何在内部工作? 2是否可以在Spark-SQL中利用此机制?

使用Spark-RDD API时,我们可以使用广播变量来优化spark分配不可变状态的方式。

1)广播变量如何在内部工作?

我的假设是: 每个用于对数据集执行操作的闭包都必须对其所有引用的变量进行序列化,通过网络传输并与任务一起还原,以便可以执行闭包。

注册这样的广播变量时​​:

val broadcastvar = sc.broadcast("hello world")

返回的对象(broadcast[String])不保留对实际对象(“ hello world”)的引用,而仅保留一些ID。 当从如上所述的闭包中引用广播变量句柄时,它将像其他变量一样被序列化-只是广播变量句柄本身不包含实际对象。

稍后在目标节点上执行闭包时,实际对象(“ hello world”)已被传输到每个节点。当闭包到达调用broadcastvar.value的地步时,广播变量句柄在内部使用ID检索实际对象。

这个假设正确吗?

2)是否可以在Spark-sql中利用此机制?

假设我有一组允许的值。

使用RDD-API时,我将为我的allowedValues创建一个广播变量:

val broadcastAllowedValues = sc.broadcast(allowedValues) // broadcast[Set[String]]

rdd.filter(row => broadcastAllowedValues.value.contains(row("mycol")))

自然,当使用Spark-sql-API时,我会为此使用Column.isin / Column.isInCollection方法

dataframe.where(col("mycol").isInCollection(allowedValues))

但是看来我无法以这种方式获得广播变量的优势。

此外,如果我将这段代码更改为以下代码

val broadcastAllowedValues = sc.broadcast(allowedValues) // broadcast[Set[String]]

dataframe.where(col("mycol").isInCollection(allowedValues.value))

这部分:

col("mycol").isInCollection(allowedValues.value)
// and more important this part:
allowedValues.value

已经在驱动程序上进行了评估,从而产生了一个新的Column对象。因此,广播变量在这里失去了优势。与第一个示例相比,它甚至会有一些开销...

是否有一种方法可以使用Spark-sql-API来利用broadcast-Variable,还是在这些时候必须显式使用RDD-API?

解决方法

广播变量如何在内部工作?

广播的数据被序列化并实际移动到所有执行器。根据{{​​3}}上的文档说,

“广播变量允许程序员在每台计算机上保留一个只读变量,而不是将其副本与任务一起发送。”

在Spark-SQL中是否可以利用这种机制?

是的,有一种利用优势的方法。在加入大小型数据框时,Spark默认会应用 Broadcast Hash Join

根据“ Learning Spark-第二版”这本书,内容为:

“默认情况下,如果较小的数据集小于10MB,Spark将使用广播联接。此配置在spark.sql.autoBroadcastJoinThreshold中进行设置;您可以根据每个执行器上的内存大小来减小或增大大小并在驱动程序中。”

在您的情况下,您需要将所有唯一的 allowedValues 列出到一个只有一列(称为allowedeValuesDF的列)的简单DataFrame(称为allowValues的数据帧)中,并应用联接来过滤您的dataframe

类似这样的东西:

import org.apache.spark.sql.functions.broadcast
val result = dataframe.join(broadcast(allowedValuesDF),"mycol === allowedValues")

实际上,您可以省略broadcast,因为Spark默认情况下会进行广播连接。

编辑:

在更高版本的Spark中,您还可以在SQL语法中使用 join提示来告诉执行引擎使用哪种策略。 Broadcast Variables中提供了详细信息,下面提供了一个示例:

-- Join Hints for broadcast join 
SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。