微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

重新分区 dask 数据帧,同时保持其原始序列

如何解决重新分区 dask 数据帧,同时保持其原始序列

我正在尝试重新分区多个 .parquet 文件以保存特定数量的镶木地板文件我有一个时间序列数据,它取决于每个客户端的观察次数(而不是时间戳),因此我需要确保分区不会将一个系列拆分为两个文件。此外,我想保留订单,因为我将标签存储在其他地方。这是我正在尝试做的一个例子:

import pandas as pd
import dask.dataframe as dd
ids = [9635,1536,8477,1088,6411,2251]

df = df = pd.DataFrame({
            "partition" : [0]*3 + [1]*3 + [2]*3 + [3]*3 + [4]*3 + [5]*3,"customer_id" : [ids[0]]*3 + [ids[1]]*3 + [ids[2]]*3 + [ids[3]]*3 + [ids[4]]*3 + [ids[5]]*3,"x": range(18)})
# indexing on "customer_id" here
df = df.set_index("customer_id")
ddf = dd.from_pandas(df,npartitions=6)
ddf.to_parquet("my_parquets")

read_ddf = dd.read_parquet("my_parquets/*.parquet")

last_idx = [ids[-1]]
my_divisions = ids + last_idx

read_ddf.divisions = my_divisions

# Split into two equal partitions with three customers each
new_divisions = [my_divisions[0],my_divisions[3],my_divisions[5]]
new_ddf = read_df.repartition(divisions=new_divisions)

引发错误

ValueError: New division must be sorted

我尝试了另一种方法,它涉及将“分区”列设置为索引并稍后将索引修改为“ids”,但这会对我的整个数据框进行排序,这是不受欢迎的,因为新序列不再与标签匹配存储。此处显示

import pandas as pd
import dask.dataframe as dd
ids = [9635,"x": range(18)})
# indexing on the defined "partition" instead
df = df.set_index("partition")
ddf = dd.from_pandas(df,npartitions=6)
ddf.to_parquet("my_parquets")

read_ddf = dd.read_parquet("my_parquets/*.parquet")

# my_range is equivalent to the list of partitions
my_range = [i for i in range(0,6)]
last_idx = [my_range[-1]]
my_divisions = my_range + last_idx

read_ddf.divisions = my_divisions

new_divisions = [0,2,4,5]

new_ddf = read_ddf.repartition(divisions=new_divisions)

# Need the "customer_id" as index
new_ddf = new_ddf.set_index("customer_id",drop = True)

但是这会按索引对数据框进行排序并弄乱结构,而我想保留原始顺序。

print("Partition 0")
print(new_ddf.get_partition(0).compute())
print("-------------------")

print("Partition 1")
print(new_ddf.get_partition(1).compute())
print("-------------------")

print("Partition 2")
print(new_ddf.get_partition(2).compute())
Partition 0
Empty DataFrame
Columns: [x]
Index: []
-------------------
Partition 1
              x
customer_id    
1088          9
1088         10
1088         11
1536          3
1536          4
1536          5
-------------------
Partition 2
              x
customer_id    
2251         15
2251         16
2251         17
6411         12
6411         13
6411         14
8477          6
8477          7
8477          8
9635          0
9635          1
9635          2

是否有针对此问题的解决方法?我知道 dask 中的 set_index 非常昂贵,但目前没有一种方法有效。另外,在我的例子中,我已经有了包含预处理数据的 .parquet 文件,所以我只使用 Pandas 创建了初始数据框以进行演示(如果在第一步中指定分区数会容易得多,如果我有熊猫中的所有数据)。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?