微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在 R 中并行运行循环以进行 tar.gz 提取、NetCDF 处理和 CSV 文件输出

如何解决在 R 中并行运行循环以进行 tar.gz 提取、NetCDF 处理和 CSV 文件输出

首先,如果我在这里遗漏了这个问题的变体,我深表歉意。我试图阅读有关在 R 中并行化 for 循环的类似问题,但我找不到任何适合我的特定情况的内容

我正在寻找的概述是并行化从目录中读取 tar.gz 文件的循环,提取所需的 NetCDF 数据,格式化/处理数据,并将其写出到 CSV 文件。我想并行运行它们的原因是每个 tar.gz 文件都相当大(几乎 8GB)。我当前的脚本一个一个地遍历每个 tar.gz。这显然需要一点时间,所以我想通过将每个 tar.gz 文件分发给 cpu 上的单个进程来加快进程。

我知道 doParallelforeach 库似乎允许这种情况发生。不幸的是,我正在努力为我的情况实施逻辑。我将提供我的原始脚本的摘要版本以及我尝试使用 doParallel 的脚本,因为我的脚本有点冗长。请参阅以下内容

原始脚本

#clear memory
rm(list = ls())

#import needed libraries
library(ncdf4)
library(tidyverse)
library(reshape2)
library(readr)

#set working directory
setwd('path_to_working_directory')
#create path to iterate through in the for loop
folder <- getwd()

#create loop to iterate through folder and process data
for (x in dir(folder)) {
    print('Extracting from tar file.')
    tar.list <- untar(x,list = TRUE)
    untar(x,files = tar.list[c(2,5)],exdir = folder) #extract needed files from tar file
    print(paste('Extracted',tar.list[2]))
    print(paste('Extracted',tar.list[5]))
    unlink(x) #delete tar.gz file
    print(paste('Removed',x))
    print('Switching to NetCDF Process...')

   ### NetCDF CODE PORTION HERE ###
   
   #write to CSV and delete extracted files
   write.csv(processed_file,file = 'name_of_file.csv',row.names = FALSE)
   unlink(tar.list[c(2,5)])
}

print('Complete.')

可以看出非常简单的脚本。由于我一次处理目录中的五个 tar.gz 文件,因此我想利用 cpu 上的各个内核。我的 cpu 有八个内核,但为此我只想使用五个内核——每个文件一个。下面是我尝试并行执行与上述相同任务的代码

并行脚本

#clear memory
rm(list = ls())

library(doParallel) #this package also contains foreach functionality

#set working directory
setwd('path_to_working_directory')
#create path to iterate through in the for loop
folder <- getwd()

registerDoParallel(cl <- makeCluster(5)) 
results <- foreach(x = dir(folder),.packages = c('ncdf4','tidyverse','reshape2','readr')) %dopar% {
  print('Extracting from tar file.')
  #list contents of tar file
  tar.list <- untar(x,list = TRUE)
  untar(x,files =  tar.list[c(2,exdir = folder)
  print(paste('Extracted',tar.list[2]))
  print(paste('Extracted',tar.list[5]))
  unlink(x) #delete tar.gz file
  print(paste('Removed',x))
  print('Switching to NetCDF Process...')

  ### NetCDF CODE PORTION HERE ###

  #write to CSV and remove extracted files
  write.csv(processed_file,file = 'file_name_here.csv',row.names = FALSE)
  unlink(tar.list[c(2,5)])
}

stopCluster(cl)

当我运行这个脚本时,我可以在 Windows 任务管理器中看到多个生成的进程;但是,我没有看到任何单个进程的任何磁盘活动。显然,我做错了什么,希望有人指出正确的方向。

感谢您的帮助!

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。