如何解决在pyspark中预处理奇怪的数据
我正在处理一组分布非常奇怪且难以处理的气候数据。我决定使用 pyspark,因为它包含大量数据,您知道,这是为了节省时间。
数据格式为.ascii/.text/.dat,随便你怎么称呼,分布如下:
日期 1 | |||||
---|---|---|---|---|---|
值 1 | 值 2 | 值 3 | 值 4 | 值 5 | 值 6 |
值 7 | 值 8 | 值 9 | 值 10 | 值 11 | 值 12 |
. | . | . | . | . | 值 101178 |
日期 2 | |||||
值 1 | 值 2 | 值 3 | 值 4 | 值 5 | 值 6 |
值 7 | 值 8 | 值 9 | 值 10 | 值 11 | 值 12 |
. | . | . | . | . | 值 101178 |
即是由分布在6列(16863行)的101178条数据组成的表格。
如果解释不是很清楚,我附上文件的一小段链接。 (原文件>50GB)
https://drive.google.com/file/d/1-aJRTWzpQ5lHyZgt-h7DuEY5GpYZRcUh/view?usp=sharing
我的想法是生成一个具有以下结构的矩阵:
日期 1 | 日期 2 | 日期 n |
---|---|---|
值 1 | Value1.2 | Value1.n |
值 2 | Value2.2 | Value2.n |
值 n | Valuen.2 | Valuen.n |
我已经尽量把问题说清楚了。正如我所说,我正在使用 pyspark,所以如果有人有任何使用此工具进行数据处理的解决方案,我将不胜感激。
非常感谢大家!
解决方法
在您发表评论后,我修改了我的答案,而不是我使用了 'pandas' 及其数据框,如有必要,这应该直接转换为 spark。
此外,我认为您的数据已损坏,因为最后一个数组的长度不正确我的代码无法处理这个,因此您需要使用从正则表达式 expected_values = m.group(4)
等中捕获的值.
警告::=
运算符需要 Python 3.8...但如果需要,您可以修复它
注意:
- 每个“部分”的标题由正则表达式捕获并用于形成列名
在日期行拆分文件:
import pandas as pd
import numpy as np
import re
from pathlib import Path
header = re.compile(r"^\s+(\d{10})\s+(\d*)\s+(\d*)\s+(\d*)$")
df = pd.DataFrame()
with open("t2.dat","r") as ifp:
rows = []
date = None
count = 0
while line := ifp.readline():
# Get the header and start a new file
if m := header.match(line):
# We have a header so convert to array then flatten to a vector
# before appending to the dataframe.
if rows and date:
df[date] = np.array(rows,dtype=float).flatten(order="C")
rows = []
# Get the header
date = m.group(1)
else:
rows.append(line.strip().split())
print(f"Appending the last {len(rows)*len(rows[0])} values")
df[date] = np.array(rows,dtype=float).flatten(order="C")
输出一个缩写形式(每个日期有 1 列,101178 行:
1990010100 1990010101 1990010102 1990010103 1990010104
0 10.4310 10.0490 9.7269 9.3801 9.0038
1 10.3110 9.9225 9.5431 9.1758 8.7899
2 10.2290 9.8144 9.4156 9.0304 8.6171
3 10.1500 9.7154 9.2999 8.8890 8.4713
4 9.8586 9.3968 8.9156 8.4328 7.9764
... ... ... ... ... ...
101173 -1.5511 -1.5472 -1.5433 -1.5251 -1.5399
101174 -1.8659 -1.8719 -1.8485 -1.8481 -1.8325
101175 -1.9044 -1.8597 -1.7963 -1.8094 -1.7653
101176 -2.0564 -2.0404 -1.9779 -1.9893 -1.9521
101177 -2.1842 -2.2840 -2.3216 -2.2794 -2.2655
,
我设法让它非常接近您预期的数据帧结构,但并不完全如此。
为了测试代码的输出,我制作了这个虚拟数据集来玩,因为在原始数据集中跟踪大量数字非常困难
+--------------------------------------------+
|value |
+--------------------------------------------+
| 1990010100 0 0 24|
| 001 002 003 004 005 006 |
| 007 008 009 010 011 012 |
| 013 014 015 016 017 018 |
| 019 020 021 022 023 024 |
| 1990010101 0 0 24|
| 101 102 103 104 105 106 |
| 107 108 109 110 111 112 |
| 113 114 115 116 117 118 |
| 119 120 121 122 123 124 |
| 1990010102 0 0 24|
| 201 202 203 204 205 206 |
| 207 208 209 210 211 212 |
| 213 214 215 216 217 218 |
| 219 220 221 222 223 224 |
+--------------------------------------------+
这是我测试过的完整代码。主要思想是以某种方式标记日期块及其记录,以便它们可以相互连接。
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Window as W
# I'm not sure what cluster do you have,or you're going to run on your local machine,so I added this piece here for your reference. The most important thing is you would want to provide enough resources to handle entire dataset all at once.
spark = (SparkSession
.builder
.master('local[*]')
.appName('SO')
.config('spark.driver.cores','4')
.config('spark.driver.memory','4g')
.config('spark.executor.cores','4')
.config('spark.executor.memory','4g')
.getOrCreate()
)
# load raw weird data and apply some transformation to make it "readable" or "processable"
df = (spark
.read.text('t2.dat')
.withColumn('id',F.row_number().over(W.orderBy(F.lit(1)))) # making ID per row,very important to mark the dates
.withColumn('value',F.trim(F.col('value'))) # trim spaces before and after
.withColumn('value',F.split(F.col('value'),'\s+')) # turn single line to individual values
)
# +------------------------------+---+
# |value |id |
# +------------------------------+---+
# |[1990010100,24] |1 |
# |[001,002,003,004,005,006]|2 |
# |[007,008,009,010,011,012]|3 |
# |[013,014,015,016,017,018]|4 |
# |[019,020,021,022,023,024]|5 |
# |[1990010101,24] |6 |
# |[101,102,103,104,105,106]|7 |
# |[107,108,109,110,111,112]|8 |
# |[113,114,115,116,117,118]|9 |
# |[119,120,121,122,123,124]|10 |
# |[1990010102,24] |11 |
# |[201,202,203,204,205,206]|12 |
# |[207,208,209,210,211,212]|13 |
# |[213,214,215,216,217,218]|14 |
# |[219,220,221,222,223,224]|15 |
# +------------------------------+---+
# Extracting available date blocks
date_df = (df
.where(F.size('value') == 4)
.withColumn('grp',((F.col('id') - 1) / 5).cast('int')) # replace 5 with 16864 when run with your actual dataset
.select('grp',F.col('value')[0].alias('date'))
)
date_df.show(10,False)
# +---+----------+
# |grp|date |
# +---+----------+
# |0 |1990010100|
# |0 |1990010101|
# |0 |1990010102|
# +---+----------+
# Extracting available value blocks
value_df = (df
.where(F.size('value') == 6)
.withColumn('grp',((F.col('id') - 1) / 5).cast('int')) # replace 5 with 16864 when run with your actual dataset
.groupBy('grp')
.agg(F.collect_list('value').alias('value'))
.withColumn('value',F.flatten('value'))
)
# +---+------------------------------------------------------------------------------------------------------------------------+
# |grp|value |
# +---+------------------------------------------------------------------------------------------------------------------------+
# |0 |[001,006,007,012,013,018,019,024]|
# |1 |[101,106,107,112,113,118,119,124]|
# |2 |[201,206,207,212,213,218,219,224]|
# +---+------------------------------------------------------------------------------------------------------------------------+
# join them together and "explode" array to different rows
joined_df = (date_df
.join(value_df,on=['grp'])
.withColumn('value',F.explode('value'))
)
# +---+----------+-----+
# |grp|date |value|
# +---+----------+-----+
# |0 |1990010100|001 |
# |0 |1990010100|002 |
# |0 |... |... |
# |0 |1990010100|023 |
# |0 |1990010100|024 |
# |1 |1990010101|101 |
# |1 |1990010101|102 |
# |1 |... |... |
# |1 |1990010101|123 |
# |1 |1990010101|124 |
# |2 |1990010102|201 |
# |2 |1990010102|202 |
# |2 |... |... |
# |2 |1990010102|223 |
# |2 |1990010102|224 |
# +---+----------+-----+
# now,joined_df is basically holding your entire dataset,it's totally up to you how do you want to handle it.
# One option is you can save each date as a partition of one Hive table.
# Another option is saving each date as a file.
# It's just for the sake of simplicity when you'll ever need to read that painful dataset again.
for date in [row['date'] for row in date_df.collect()]:
(joined_df
.where(F.col('date') == date)
.write
.mode('overwrite')
.csv(date)
)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。