微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用最新版本的dplyr1.0,sparklyr1.4和SPARK3.0/ Hadoop2.7从Spark数据框中提取每组的前n行?

如何解决如何使用最新版本的dplyr1.0,sparklyr1.4和SPARK3.0/ Hadoop2.7从Spark数据框中提取每组的前n行?

我对top_n()scale_head()的尝试均因错误而失败。

https://github.com/tidyverse/dplyr/issues/4467报告了top_n()一个问题,并由哈德利(Hadley)结束并发表了评论

这将由#4687 + tidyverse / dbplyr#394通过 引入了新的slice_min()slice_max()函数 还允许我们使用top_n()解决一些界面问题。

尽管更新了我所有的软件包,但调用top_n()失败,并显示以下信息:

Error: org.apache.spark.sql.AnalysisException: Undefined function: 'top_n_rank'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 3 pos 7

(查看完整代码并在下面登录

好的,top_n()现在已被dplyr 1.0取代,所以我尝试了slice_head()。这也会失败,并带有:

Error in UseMethod("slice_head") : 
  no applicable method for 'slice_head' applied to an object of class "c('tbl_spark','tbl_sql','tbl_lazy','tbl')"

我刚刚开始使用Sparklyr ...有人可以重现这些问题吗?还是应该寻找安装问题/某些软件包的不兼容版本?

如果问题得到确认,我还应该如何从Spark数据框中提取每组的前n行?

代码示例:

library(sparklyr)
library(dplyr)

# Just in case already connected
spark_disconnect_all()

# Connect to local Spark cluster
sc <- spark_connect(master = "local")

# Print the version of Spark
spark_version(sc = sc)

# copy data frame to Spark
iris_tbl <- copy_to(sc,iris)

# List the data frames available in Spark
src_tbls(sc)

# Get some info from the data frame
dim(iris_tbl)
glimpse(iris_tbl)

# Return the first 10 rows for each Species 
# Using top_n()
top_10 <- iris_tbl %>%
  group_by(Species) %>%
  top_n(10)
glimpse(top_10)

# Using slice_head()
slice_head_10 <- iris_tbl %>%
  group_by(Species) %>%
  slice_head(n = 10)
slice_head_10
glimpse(slice_head_10)

# disconnect from Spark
spark_disconnect(sc = sc)

# session Info
sessionInfo()

完整日志(在Rmarkdown中运行):

Restarting R session...

> # Chunk 1: setup
> knitr::opts_chunk$set(echo = TRUE)
> 
> # Chunk 2
> library(sparklyr)
> library(dplyr)

Attaching package: ‘dplyr’

The following objects are masked from ‘package:stats’:

    filter,lag

The following objects are masked from ‘package:base’:

    intersect,setdiff,setequal,union

> 
> # Just in case already connected
> spark_disconnect_all()
[1] 0
> 
> # Connect to local Spark cluster
> sc <- spark_connect(master = "local")
* Using Spark: 3.0.0
> 
> # Print the version of Spark
> spark_version(sc = sc)
[1] ‘3.0.0’
> 
> # copy data frame to Spark
> iris_tbl <- copy_to(sc,iris)
> 
> # List the data frames available in Spark
> src_tbls(sc)
[1] "iris"
> 
> # Get some info from the data frame
> dim(iris_tbl)
[1] NA  5
> glimpse(iris_tbl)
Rows: ??
Columns: 5
Database: spark_connection
$ Sepal_Length <dbl> 5.1,4.9,4.7,4.6,5.0,5.4,4.4,4.8…
$ Sepal_Width  <dbl> 3.5,3.0,3.2,3.1,3.6,3.9,3.4,2.9,3.7,3.4…
$ Petal_Length <dbl> 1.4,1.4,1.3,1.5,1.7,1.6…
$ Petal_Width  <dbl> 0.2,0.2,0.4,0.3,0.1,0.2…
$ Species      <chr> "setosa","setosa","setosa"…
> 
> # Return the first 10 rows for each Species 
> # Using top_n()
> top_10 <- iris_tbl %>%
+   group_by(Species) %>%
+   top_n(10)
Selecting by Species
> glimpse(top_10)
Error: org.apache.spark.sql.AnalysisException: Undefined function: 'top_n_rank'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 3 pos 7
    at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$15.$anonfun$applyOrElse$102(Analyzer.scala:1852)
    at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:53)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$15.applyOrElse(Analyzer.scala:1852)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$15.applyOrElse(Analyzer.scala:1843)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsDown$1(QueryPlan.scala:96)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:118)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:118)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:129)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:139)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:139)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:96)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:87)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveExpressions$1.applyOrElse(AnalysisHelper.scala:129)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveExpressions$1.applyOrElse(AnalysisHelper.scala:128)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$2(AnalysisHelper.scala:108)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$1(AnalysisHelper.scala:108)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown(AnalysisHelper.scala:106)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown$(AnalysisHelper.scala:104)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$4(AnalysisHelper.scala:113)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$1(AnalysisHelper.scala:113)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown(AnalysisHelper.scala:106)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown$(AnalysisHelper.scala:104)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:73)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:72)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveExpressions(AnalysisHelper.scala:128)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveExpressions$(AnalysisHelper.scala:127)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveExpressions(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$.apply(Analyzer.scala:1843)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$.apply(Analyzer.scala:1840)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:149)
    at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
    at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
    at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:146)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:138)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:176)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:170)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:130)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:116)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:116)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:154)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:153)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:68)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:68)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:66)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:58)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at sparklyr.Invoke.invoke(invoke.scala:147)
    at sparklyr.StreamHandler.handleMethodCall(stream.scala:136)
    at sparklyr.StreamHandler.read(stream.scala:61)
    at sparklyr.BackendHandler.$anonfun$channelRead0$1(handler.scala:58)
    at scala.util.control.Brea
> 
> # Using slice_head()
> slice_head_10 <- iris_tbl %>%
+   group_by(Species) %>%
+   slice_head(n = 10)
Error in UseMethod("slice_head") : 
  no applicable method for 'slice_head' applied to an object of class "c('tbl_spark','tbl')"
> slice_head_10
Error: object 'slice_head_10' not found
> glimpse(slice_head_10)
Error in glimpse(slice_head_10) : object 'slice_head_10' not found
> 
> # disconnect from Spark
> spark_disconnect(sc = sc)
> 
> # session Info
> sessionInfo()
R version 4.0.3 (2020-10-10)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 20.04.1 LTS

Matrix products: default
BLAS:   /usr/lib/x86_64-linux-gnu/openblas-pthread/libblas.so.3
LAPACK: /usr/lib/x86_64-linux-gnu/openblas-pthread/liblapack.so.3

locale:
 [1] LC_CTYPE=C.UTF-8       LC_NUMERIC=C           LC_TIME=C.UTF-8       
 [4] LC_COLLATE=C.UTF-8     LC_MONETARY=C.UTF-8    LC_MESSAGES=C.UTF-8   
 [7] LC_PAPER=C.UTF-8       LC_NAME=C              LC_ADDRESS=C          
[10] LC_TELEPHONE=C         LC_MEASUREMENT=C.UTF-8 LC_IDENTIFICATION=C   

attached base packages:
[1] stats     graphics  Grdevices utils     datasets  methods   base     

other attached packages:
[1] dplyr_1.0.2.9000 sparklyr_1.4.0  

loaded via a namespace (and not attached):
 [1] pillar_1.4.6      compiler_4.0.3    dbplyr_1.4.4      r2d3_0.2.3       
 [5] base64enc_0.1-3   tools_4.0.3       digest_0.6.27     jsonlite_1.7.1   
 [9] lifecycle_0.2.0   tibble_3.0.4      pkgconfig_2.0.3   rlang_0.4.8      
[13] DBI_1.1.0         cli_2.1.0         rstudioapi_0.11   yaml_2.2.1       
[17] parallel_4.0.3    xfun_0.18         withr_2.3.0       httr_1.4.2       
[21] knitr_1.30        generics_0.0.2    htmlwidgets_1.5.2 vctrs_0.3.4      
[25] askpass_1.1       rappdirs_0.3.1    rprojroot_1.3-2   tidyselect_1.1.0 
[29] glue_1.4.2        forge_0.2.0       R6_2.4.1          fansi_0.4.1      
[33] purrr_0.3.4       tidyr_1.1.2       blob_1.2.1        magrittr_1.5     
[37] backports_1.1.10  ellipsis_0.3.1    htmltools_0.5.0   assertthat_0.2.1 
[41] config_0.3        utf8_1.1.4        openssl_1.4.3     Crayon_1.3.4     
> 

解决方法

使用filterrow_number。请注意,您需要先指定arrangerow_number才能在sparklyr中工作。

iris_tbl %>%
  group_by(Species) %>%
  arrange(Sepal_Length) %>%
  filter(row_number() <= 3)
#> # Source:     spark<?> [?? x 5]
#> # Groups:     Species
#> # Ordered by: Sepal_Length
#>   Sepal_Length Sepal_Width Petal_Length Petal_Width Species   
#>          <dbl>       <dbl>        <dbl>       <dbl> <chr>     
#> 1          4.9         2.4          3.3         1   versicolor
#> 2          5           2            3.5         1   versicolor
#> 3          5           2.3          3.3         1   versicolor
#> 4          4.9         2.5          4.5         1.7 virginica 
#> 5          5.6         2.8          4.9         2   virginica 
#> 6          5.7         2.5          5           2   virginica 
#> 7          4.3         3            1.1         0.1 setosa    
#> 8          4.4         2.9          1.4         0.2 setosa    
#> 9          4.4         3            1.3         0.2 setosa 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?