微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

将 R 中列的最后 N 天覆盖到雪花

如何解决将 R 中列的最后 N 天覆盖到雪花

出于示例目的,我使用了 tidyquant 数据集。

install.packages('tidyquant')
library(tidyquant)
library(data.table)
options("getSymbols.warning4.0"=FALSE)
options("getSymbols.yahoo.warning"=FALSE)
# Downloading Apple price using quantmod
first.date <- Sys.Date() - 30
last.date <- Sys.Date()

getSymbols("AAPL",from = first.date,to = last.date,warnings = FALSE,auto.assign = TRUE)
AAPL<-data.frame(AAPL)
AAPL<- setDT(AAPL,keep.rownames = TRUE)[]
colnames(AAPL)[1] <- "DATE"
AAPL$RUNDATE<-AAPL$DATE
head(AAPL)

         DATE AAPL.Open AAPL.High AAPL.Low AAPL.Close AAPL.Volume AAPL.Adjusted      RUNDATE
1: 2021-03-19    119.90    121.43   119.68     119.99   185023200        119.99   2021-03-19
2: 2021-03-22    120.33    123.87   120.26     123.39   111912300        123.39   2021-03-22
3: 2021-03-23    123.33    124.24   122.14     122.54    95467100        122.54   2021-03-23
4: 2021-03-24    122.82    122.90   120.07     120.09    88530500        120.09   2021-03-24
5: 2021-03-25    119.54    121.66   119.00     120.59    98844700        120.59   2021-03-25
6: 2021-03-26    120.35    121.48   118.92     121.21    93958900        121.21   2021-03-26

这是我的目标,我已经想出了如何使用 Databricks 中的 R 使用以下命令将此数据集上传到雪花中:

sparkr.sdf <- SparkR::createDataFrame(AAPL)

SparkR::write.df(
  df = sparkr.sdf,source = "sNowflake",mode = "overwrite",sfUrl = "<sNowflake-url>",sfUser = user,sfPassword = password,sfDatabase = "<sNowflake-database>",sfSchema = "<sNowflake-schema>",sfWarehouse = "<sNowflake-cluster>",dbtable = "AAPL_table"
)

但是,我想每天运行并上传最新日期的拉取(过去 30 天),附加并覆盖 RunDate 列中的过去 30 天。

我的想法是使用上面相同的查询,只是将 rundate 列更改为今天的日期。 AAPL$RUNDATE<-Sys.Date() 每天。这样,当我运行它时,我可以从这个新的 AAPL$RUNDATE 中减去 30 天,并且只保留 AAPL$RUNDATE 前进 30 天时间范围内的任何内容

我看到了这段代码,但没有看到设置回溯限制的选项

DBI::dbSendQuery(sNowflake.conn,"use schema schemaname") # strange this is required

dbWriteTable(sNowflake.conn,'tablename',df,append = T,overwrite = F,verbose = T) 

或者这个 source:

# Write table
sparklyr.sdf <- copy_to(sc,iris,overwrite = TRUE)

sparklyr.sdf %>%
  spark_write_source(
   sc = sc,options = sf_options
  )

这个想法是我每天运行这个数据拉取过去 30 天的数据,而不是整个时间段,并附加到现有数据集。

以前有没有人做过这样的事情,非常感谢您对此提供的帮助,如果您需要更清楚地了解我要完成的工作,请告诉我

解决方法

假设您运行代码以获取新数据并将其存储在名为 AAPL 的变量中,那么

AAPL$RUNDATE <- Sys.Date()
DBI::dbWriteTable(conn,AAPL,append = TRUE)

从这里开始,我认为您有两个选择:

  1. 删除超过 30 天的数据。将新数据附加到表后,然后运行

    DBI::dbExecute(conn,"delete from AAPL_table
       where DATEDIFF(day,RUNDATE,CURRENT_DATE()) > 30")
    

    (使用来自 DATEDIFFCURRENT_DATE 的示例形成。)

    作为替代方案,如果您不今天运行清算,您可能更喜欢使用表中观察到的最大值 RUNDATE 而不是今天的日期,在这种情况下,您可能会用到

    maxdate <- DBI::dbGetQuery(conn,"select max(RUNDATE) as maxdate from AAPL_table")$maxdate
    if (length(maxdate) && !anyNA(maxdate)) {
      DBI::dbExecute(conn,"delete from AAPL_table where DATEDIFF(day,?) > 30",params = list(maxdate))
    }
    
  2. 保留表中的旧数据,无论何时从雪花实例中检索它,只检索过去 30 天的价值:

    x <- DBI::dbGetQuery(conn,"select ... from AAPL_table where RUNDATE >= ?",params = list(Sys.Date()))
    

(注意:我没有snowflake,所以这个是未经测试的。我在sql server中测试了它的前提没有问题,并验证了至少支持两个sql函数。)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。