如何解决如何通过子查询和地图查找进行高阶函数转换?
这是我之前的question
的后续问题scala> val map1 = spark.sql("select map('s1','p1','s2','p2','s3','p3') as lookup")
map1:org.apache.spark.sql.DataFrame = [查找:map
]
scala> val ds1 = spark.sql("select 'p1' as p,Array('s2','s3') as c")
ds1:org.apache.spark.sql.DataFrame = [p:字符串,c:数组]
scala> ds1.createOrReplaceTempView("ds1")
scala> map1.createOrReplaceTempView("map1")
scala> map1.show()
+--------------------+
| lookup|
+--------------------+
|[p1 -> s1,p2 -> ...|
+--------------------+
scala> ds1.show()
+---+--------+
| p| c|
+---+--------+
| p1|[s2,s3]|
+---+--------+
map1.selectExpr("element_at(`lookup`,'s2')").first()
res50:org.apache.spark.sql.Row = [p2]
scala> spark.sql("select element_at(`lookup`,'s1') from map1").show()
+----------------------+
|element_at(lookup,s1)|
+----------------------+
| p1|
+----------------------+
到目前为止一切顺利。在接下来的两个步骤中,我遇到了一些问题:
scala> ds1.selectExpr("p","c","transform(c,cs -> map1.selectExpr('element_at(`lookup`,cs)')) as cs").show()
20/09/28 19:44:59警告HiveConf:名称的HiveConf hive.stats.jdbc.timeout不存在20/09/28 19:44:59警告 HiveConf:名称为hive.stats.retries.wait的HiveConf不存在 20/09/28 19:45:03 WARN ObjectStore:在以下版本中找不到版本信息 元商店。 hive.metastore.schema.verification未启用,因此 记录架构版本2.3.0 20/09/28 19:45:03 WARN ObjectStore: 调用了setMetaStoreSchemaVersion,但禁用了录制版本: 版本= 2.3.0,注释=由MetaStore root@10.1.21.76设置20/09/28 19:45:03 WARN ObjectStore:无法获取数据库map1,返回 NoSuchObjectException org.apache.spark.sql.AnalysisException: 未定义的函数:“ selectExpr”。此功能既不是 已注册的临时功能或已注册的永久功能 数据库“ map1”。第1行pos 19
scala> spark.sql("""select p,c,transform(c,cs -> (select element_at(`lookup`,cs) from map1)) cc from ds1""").show()
org.apache.spark.sql.AnalysisException:无法解析给定的'
cs
' 输入列:[map1.lookup];第1行pos 61; 'Project [p#329,c#330, 变换(c#330,lambdafunction(标量子查询#713 [],lambda cs#715, false))AS cc#714]:+-'项目 [unresolvedalias('element_at(lookup#327,'cs),None)]:+- SubqueryAlias map1:+-专案[map(s1,p1,s2,p2,s3,p3)AS lookup#327]:+-OneRowRelation +-SubqueryAlias ds1 +-项目[p1 AS p#329,array(s2,s3)AS c#330] +-OneRowRelatio
我该如何解决这些问题?
解决方法
如果map1
的行太多,则可以对从c
列中从数组中提取的所有值进行交叉连接。
spark.sql("select col as value,element_at(map1.lookup,col) as key +
"from (select explode(ds1.c) from ds1) as v cross join map1")
结果(将上述内容分配给DataFrame类型的值,然后调用.show
):
+-----+---+
|value|key|
+-----+---+
| s2| p2|
| s3| p3|
+-----+---+
,
只需将表名添加到from
子句中。
spark.sql("""select p,c,transform(c,cs -> element_at(`lookup`,cs)) cc from ds1 a,map1 b""").show()
+---+--------+--------+
| p| c| cc|
+---+--------+--------+
| p1|[s2,s3]|[p2,p3]|
+---+--------+--------+
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。