如何解决Pandas-Dataframe Parallel Apply (Swifter, TQDM::process_map) 冻结?当被调用时
我有一个包含大约 15k 个音频文件路径的数据框,我想用它来执行操作(人为地添加噪音)。通常整个事情都可以工作,但即使记录较少(16)也需要很长时间。问题不在于函数的执行时间,而在于所有初始化之前的时间。
start = time.time()
data_augmented = data_augmented.swifter.progress_bar(True,desc="Merge Sounds") \
.apply(merge_sounds(**settings),axis=1)
print(f"{time.time-(start)} - Map Timer")
Merge Sounds: 100%|█████████████████████████████| 16/16 [00:07<00:00,2.09it/s]
26.973325729370117 - Map Timer
正如您在此处看到的,初始化所需的时间几乎是 Lambda 函数 (merge_sounds
) 运行时间的 4 倍。 initialization-time
我指的是 elapsed_time_measured_by_myself - elapsed_time_measured_by_tqdm
所以在这种情况下 26.97.. - 7 = 19.97
start = time.time()
lambda_fn = merge_sounds(**settings) #doesnt work if i put it in the line below.
data_augmented = process_map(lambda_fn,data_augmented,max_workers=threads,desc=f"Merge_sounds [{threads} Threads]")
print(f"{time.time-(start)} - Map Timer")
卡在:
Merge_sounds [16 Threads]: 0%| | 0/16 [00:00<?,?it/s]
with Pool(processes=16) as pool:
data_augmented = pool.map(merge_sounds(**settings),tqdm(data_augmented,desc=f"Merge Sounds: {16} Threads"))
卡在:
Merge Sounds: 16 Threads: 38%|██████ | 6/16 [00:00<00:00,4697.75it/s]
我知道并行化对于较小的数据集没有意义,我只是不明白为什么我可以轻松地并行化代码中所有地方的所有内容,而我在这里无法取得进展。后来我在大量数据上运行了这段代码,所以如果并行性可以工作,我会很高兴。
Map 中使用的函数是:
def merge_sounds(**settings):
_range = settings.get("snr_range",(0.15,0.65))
assert len(_range),"snr_range -> e.g. (0.15,0.75)"
target_sample_rate = settings.get("target_sample_rate","16000")
if "target_path" not in settings.keys():
raise Exception("please Specify target_path in Settings-Dict")
target_path = Path(settings["target_path"])
target_path.mkdir(parents=True,exist_ok=True)
def __call__(item):
_target_path = item["path_augmented"]
snr = round(uniform(_range[0],_range[1]),4)
pad_idx = item.name
yp,_ = IO.load(item["path"],sample_rate=target_sample_rate)
yn,_ = IO.load(item["path_noise"],sample_rate=target_sample_rate)
item["snr"] = snr
y_augmented = Effect.add_noise(yp,yn,snr=snr,pad_idx=pad_idx)
IO.save_wav(y_augmented,_target_path,target_sample_rate)
return item
return __call__
是否有什么我忘记并行化映射函数的事情(似乎在我的代码中的其他任何地方都可以使用这种变体之一,就像预期的那样)
Ty in Advanced。
解决方法
如果通过绕过 TQDM 解决了这个问题。我压缩了我需要的列
_paths_in = _df["path_input"]
_paths_out = _df["path_output"]
_path_noise = _df["path_noise"]
job = zip(_paths_in,_path_noise,_paths_out,_filter_jobs)
然后我只是将它传递给多处理器函数。
jobs = list(enumerate((zip_jobs(df))))
with Pool(processes=_threads) as pool:
data_augmented = pool.map(execute_job,tqdm(jobs,desc=f"Audio-Augmentation: {_threads} Threads"))
merge_sounds
和新的execute_job
类似,只是改变了,函数需要什么参数。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。