如何解决汇总标准差并计算Sparklyr中的非NA无NA
我有一个很大的data.frame,并且我一直在使用summarise
和across
来汇总众多变量的摘要统计信息。由于data.frame的大小,我不得不开始处理sparklyr
中的数据。
由于sparklyr
不支持across
,因此我正在使用summarise_each
。除summarise_each
中的sparklyr
似乎不支持sd
和sum(!is.na(.))
test <- data.frame(ID = c("Group1","Group1",'Group1',"Group2",'Group2',"Group3","Group3"),Value1 = c(-100,-10,-5,1,2,3,4,3),Value2 = c(50,100,10,3))
test %>%
group_by %>%
summarise(across((Value1:Value2),~sum(!is.na(.),na.rm = TRUE),.names = "{col}_count"),across((Value1:Value2),~min(.,.names = "{col}_min"),~max(.,.names = "{col}_max"),~mean(.,.names = "{col}_mean"),~sd(.,.names = "{col}_sd"))
# A tibble: 1 x 10
Value1_count Value2_count Value1_min Value2_min Value1_max Value2_max Value1_mean Value2_mean Value1_sd Value2_sd
<int> <int> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
1 17 17 -100 -5 4 100 -5.53 11.2 24.7 25.8
我还能够使用summarise_each成功实现相同的答案,如下所示:
test %>%
group_by(ID) %>%
summarise_each(funs(min = min(.,max = max(.,mean = mean(.,sum = sum(.,sd = sd(.,na.rm = TRUE)))
ID Value1_min Value2_min Value1_max Value2_max Value1_mean Value2_mean Value1_sum Value2_sum
<fct> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
1 Group1 -100 -5 2 100 -17.4 23 -122 161
2 Group2 1 2 4 4 3.14 3.29 22 23
3 Group3 1 1 3 3 2 2 6 6
使用sparklyr
时,我已经能够成功计算出min
,max
,mean
,sum
,如下所示:
sc <- spark_connect(master = "local",version = "2.4.3")
test <- spark_read_csv(sc = sc,path = "C:\\path\\test space.csv")
test %>%
group_by(ID) %>%
summarise_each(funs(min = min(.,na.rm = TRUE)))
# Source: spark<?> [?? x 9]
ID Value1_min Value_2_min Value1_max Value_2_max Value1_mean Value_2_mean Value1_sum Value_2_sum
<chr> <int> <int> <int> <int> <dbl> <dbl> <dbl> <dbl>
1 Group2 1 2 4 4 3.14 3.29 22 23
2 Group3 1 1 3 3 2 2 6 6
3 Group1 -100 -5 2 100 -17.4 23 -122 161
但是尝试获取sd
和sum(!is.na(.))
时收到错误消息,以下是我收到的代码和错误消息。有什么办法可以帮助汇总这些值?
test %>%
group_by(ID) %>%
summarise_each(funs(min = min(.,na.rm = TRUE)))
Error: org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input 'AS' expecting ')'(line 1,pos 298)
== sql ==
SELECT `ID`,MIN(`Value1`) AS `Value1_min`,MIN(`Value_2`) AS `Value_2_min`,MAX(`Value1`) AS `Value1_max`,MAX(`Value_2`) AS `Value_2_max`,AVG(`Value1`) AS `Value1_mean`,AVG(`Value_2`) AS `Value_2_mean`,SUM(`Value1`) AS `Value1_sum`,SUM(`Value_2`) AS `Value_2_sum`,stddev_samp(`Value1`,TRUE AS `na.rm`) AS `Value1_sd`,stddev_samp(`Value_2`,TRUE AS `na.rm`) AS `Value_2_sd`
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------^^^
FROM `test_space_30172a44_c0aa_4305_9a5e_d45fa77ba0b9`
GROUP BY `ID`
at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:241)
at org.apache.spark.sql.catalyst.parser.AbstractsqlParser.parse(ParseDriver.scala:117)
at org.apache.spark.sql.execution.SparksqlParser.parse(SparksqlParser.scala:48)
at org.apache.spark.sql.catalyst.parser.AbstractsqlParser.parsePlan(ParseDriver.scala:69)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
at sun.reflect.GeneratedMethodAccessor66.invoke(UnkNown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at sparklyr.Invoke.invoke(invoke.scala:147)
at sparklyr.StreamHandler.handleMethodCall(stream.scala:136)
at sparklyr.StreamHandler.read(stream.scala:61)
at sparklyr.BackendHandler$$anonfun$channelRead0$1.apply$mcV$sp(handler.scala:58)
at scala.util.control.Breaks.breakable(Breaks.scala:38)
at sparklyr.BackendHandler.channelRead0(handler.scala:38)
at sparklyr.BackendHandler.channelRead0(handler.scala:14)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.MessagetoMessageDecoder.channelRead(MessagetoMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.BytetoMessageDecoder.fireChannelRead(BytetoMessageDecoder.java:310)
at io.netty.handler.codec.BytetoMessageDecoder.channelRead(BytetoMessageDecoder.java:284)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
In addition: Warning messages:
1: Named arguments ignored for sql stddev_samp
2: Named arguments ignored for sql stddev_samp
解决方法
问题是na.rm
参数。 Spark的stddev_samp
函数没有这样的参数,sparklyr
似乎没有处理它。
缺少的值总是在SQL中删除,因此您无需指定na.rm
。
test_spark %>%
group_by(ID) %>%
summarise_each(funs(min = min(.),max = max(.),mean = mean(.),sum = sum(.),sd = sd(.)))
#> # Source: spark<?> [?? x 11]
#> ID Value1_min Value2_min Value1_max Value2_max Value1_mean Value2_mean
#> <chr> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
#> 1 Group2 1 2 4 4 3.14 3.29
#> 2 Group1 -100 -5 2 100 -17.4 23
#> 3 Group3 1 1 3 3 2 2
#> Value1_sum Value2_sum Value1_sd Value2_sd
#> <dbl> <dbl> <dbl> <dbl>
#> 1 22 23 1.21 0.951
#> 2 -122 161 36.6 38.6
#> 3 6 6 1 1
这似乎是特定于summarise
的错误,例如sd
和na.rm
的{{1}}。
mutate
对于test_spark %>%
group_by(ID) %>%
mutate_each(funs(sd = sd(.,na.rm = TRUE)))
,您只需要将其写为sum(!is.na(.))
。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。