在pyspark中预处理奇怪的数据

如何解决在pyspark中预处理奇怪的数据

我正在处理一组分布非常奇怪且难以处理的气候数据。我决定使用 pyspark,因为它包含大量数据,您知道,这是为了节省时间。

数据格式为.ascii/.text/.dat,随便你怎么称呼,分布如下:


日期 1
值 1 值 2 值 3 值 4 值 5 值 6
值 7 值 8 值 9 值 10 值 11 值 12
. . . . . 值 101178
日期 2
值 1 值 2 值 3 值 4 值 5 值 6
值 7 值 8 值 9 值 10 值 11 值 12
. . . . . 值 101178

即是由分布在6列(16863行)的101178条数据组成的表格。

如果解释不是很清楚,我附上文件的一小段链接。 (原文件>50GB)

https://drive.google.com/file/d/1-aJRTWzpQ5lHyZgt-h7DuEY5GpYZRcUh/view?usp=sharing

我的想法是生成一个具有以下结构的矩阵:


日期 1 日期 2 日期 n
值 1 Value1.2 Value1.n
值 2 Value2.2 Value2.n
值 n Valuen.2 Valuen.n

我已经尽量把问题说清楚了。正如我所说,我正在使用 pyspark,所以如果有人有任何使用此工具进行数据处理的解决方案,我将不胜感激。

非常感谢大家!

解决方法

在您发表评论后,我修改了我的答案,而不是我使用了 'pandas' 及其数据框,如有必要,这应该直接转换为 spark。

此外,我认为您的数据已损坏,因为最后一个数组的长度不正确我的代码无法处理这个,因此您需要使用从正则表达式 expected_values = m.group(4) 等中捕获的值.

警告::= 运算符需要 Python 3.8...但如果需要,您可以修复它

注意:

  • 每个“部分”的标题由正则表达式捕获并用于形成列名

在日期行拆分文件:

import pandas as pd
import numpy as np
import re
from pathlib import Path

header = re.compile(r"^\s+(\d{10})\s+(\d*)\s+(\d*)\s+(\d*)$")

df = pd.DataFrame()

with open("t2.dat","r") as ifp:
    rows = []
    date = None
    count = 0
    
    while line := ifp.readline():
        # Get the header and start a new file
        if m := header.match(line):
            # We have a header so convert to array then flatten to a vector
            # before appending to the dataframe.
            if rows and date:
                df[date] = np.array(rows,dtype=float).flatten(order="C")
                rows = []
                
            # Get the header
            date = m.group(1)
            
        else:
            rows.append(line.strip().split())
    
    print(f"Appending the last {len(rows)*len(rows[0])} values")
    df[date] = np.array(rows,dtype=float).flatten(order="C")

输出一个缩写形式(每个日期有 1 列,101178 行:

    1990010100  1990010101  1990010102  1990010103  1990010104
0   10.4310 10.0490 9.7269  9.3801  9.0038
1   10.3110 9.9225  9.5431  9.1758  8.7899
2   10.2290 9.8144  9.4156  9.0304  8.6171
3   10.1500 9.7154  9.2999  8.8890  8.4713
4   9.8586  9.3968  8.9156  8.4328  7.9764
... ... ... ... ... ...
101173  -1.5511 -1.5472 -1.5433 -1.5251 -1.5399
101174  -1.8659 -1.8719 -1.8485 -1.8481 -1.8325
101175  -1.9044 -1.8597 -1.7963 -1.8094 -1.7653
101176  -2.0564 -2.0404 -1.9779 -1.9893 -1.9521
101177  -2.1842 -2.2840 -2.3216 -2.2794 -2.2655
,

我设法让它非常接近您预期的数据帧结构,但并不完全如此。

为了测试代码的输出,我制作了这个虚拟数据集来玩,因为在原始数据集中跟踪大量数字非常困难

+--------------------------------------------+
|value                                       |
+--------------------------------------------+
|  1990010100           0           0      24|
|  001  002  003  004  005  006              |
|  007  008  009  010  011  012              |
|  013  014  015  016  017  018              |
|  019  020  021  022  023  024              |
|  1990010101           0           0      24|
|  101  102  103  104  105  106              |
|  107  108  109  110  111  112              |
|  113  114  115  116  117  118              |
|  119  120  121  122  123  124              |
|  1990010102           0           0      24|
|  201  202  203  204  205  206              |
|  207  208  209  210  211  212              |
|  213  214  215  216  217  218              |
|  219  220  221  222  223  224              |
+--------------------------------------------+

这是我测试过的完整代码。主要思想是以某种方式标记日期块及其记录,以便它们可以相互连接。

from pyspark.sql import SparkSession

from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Window as W

# I'm not sure what cluster do you have,or you're going to run on your local machine,so I added this piece here for your reference. The most important thing is you would want to provide enough resources to handle entire dataset all at once.
spark = (SparkSession
    .builder
    .master('local[*]')
    .appName('SO')
    .config('spark.driver.cores','4')
    .config('spark.driver.memory','4g')
    .config('spark.executor.cores','4')
    .config('spark.executor.memory','4g')
    .getOrCreate()
)

# load raw weird data and apply some transformation to make it "readable" or "processable"
df = (spark
    .read.text('t2.dat')
    .withColumn('id',F.row_number().over(W.orderBy(F.lit(1)))) # making ID per row,very important to mark the dates
    .withColumn('value',F.trim(F.col('value')))         # trim spaces before and after
    .withColumn('value',F.split(F.col('value'),'\s+')) # turn single line to individual values
)
# +------------------------------+---+
# |value                         |id |
# +------------------------------+---+
# |[1990010100,24]        |1  |
# |[001,002,003,004,005,006]|2  |
# |[007,008,009,010,011,012]|3  |
# |[013,014,015,016,017,018]|4  |
# |[019,020,021,022,023,024]|5  |
# |[1990010101,24]        |6  |
# |[101,102,103,104,105,106]|7  |
# |[107,108,109,110,111,112]|8  |
# |[113,114,115,116,117,118]|9  |
# |[119,120,121,122,123,124]|10 |
# |[1990010102,24]        |11 |
# |[201,202,203,204,205,206]|12 |
# |[207,208,209,210,211,212]|13 |
# |[213,214,215,216,217,218]|14 |
# |[219,220,221,222,223,224]|15 |
# +------------------------------+---+

# Extracting available date blocks
date_df = (df
    .where(F.size('value') == 4)
    .withColumn('grp',((F.col('id') - 1) / 5).cast('int')) # replace 5 with 16864 when run with your actual dataset
    .select('grp',F.col('value')[0].alias('date'))
)
date_df.show(10,False)
# +---+----------+
# |grp|date      |
# +---+----------+
# |0  |1990010100|
# |0  |1990010101|
# |0  |1990010102|
# +---+----------+

# Extracting available value blocks
value_df = (df
    .where(F.size('value') == 6)
    .withColumn('grp',((F.col('id') - 1) / 5).cast('int')) # replace 5 with 16864 when run with your actual dataset
    .groupBy('grp')
    .agg(F.collect_list('value').alias('value'))
    .withColumn('value',F.flatten('value'))
)
# +---+------------------------------------------------------------------------------------------------------------------------+
# |grp|value                                                                                                                   |
# +---+------------------------------------------------------------------------------------------------------------------------+
# |0  |[001,006,007,012,013,018,019,024]|
# |1  |[101,106,107,112,113,118,119,124]|
# |2  |[201,206,207,212,213,218,219,224]|
# +---+------------------------------------------------------------------------------------------------------------------------+

# join them together and "explode" array to different rows
joined_df = (date_df
    .join(value_df,on=['grp'])
    .withColumn('value',F.explode('value'))
)
# +---+----------+-----+
# |grp|date      |value|
# +---+----------+-----+
# |0  |1990010100|001  |
# |0  |1990010100|002  |
# |0  |...       |...  |
# |0  |1990010100|023  |
# |0  |1990010100|024  |
# |1  |1990010101|101  |
# |1  |1990010101|102  |
# |1  |...       |...  |
# |1  |1990010101|123  |
# |1  |1990010101|124  |
# |2  |1990010102|201  |
# |2  |1990010102|202  |
# |2  |...       |...  |
# |2  |1990010102|223  |
# |2  |1990010102|224  |
# +---+----------+-----+

# now,joined_df is basically holding your entire dataset,it's totally up to you how do you want to handle it.
# One option is you can save each date as a partition of one Hive table.
# Another option is saving each date as a file.
# It's just for the sake of simplicity when you'll ever need to read that painful dataset again.
for date in [row['date'] for row in date_df.collect()]:
    (joined_df
        .where(F.col('date') == date)
        .write
        .mode('overwrite')
        .csv(date)
    )

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-
参考1 参考2 解决方案 # 点击安装源 协议选择 http:// 路径填写 mirrors.aliyun.com/centos/8.3.2011/BaseOS/x86_64/os URL类型 软件库URL 其他路径 # 版本 7 mirrors.aliyun.com/centos/7/os/x86
报错1 [root@slave1 data_mocker]# kafka-console-consumer.sh --bootstrap-server slave1:9092 --topic topic_db [2023-12-19 18:31:12,770] WARN [Consumer clie
错误1 # 重写数据 hive (edu)> insert overwrite table dwd_trade_cart_add_inc > select data.id, > data.user_id, > data.course_id, > date_format(
错误1 hive (edu)> insert into huanhuan values(1,'haoge'); Query ID = root_20240110071417_fe1517ad-3607-41f4-bdcf-d00b98ac443e Total jobs = 1
报错1:执行到如下就不执行了,没有显示Successfully registered new MBean. [root@slave1 bin]# /usr/local/software/flume-1.9.0/bin/flume-ng agent -n a1 -c /usr/local/softwa
虚拟及没有启动任何服务器查看jps会显示jps,如果没有显示任何东西 [root@slave2 ~]# jps 9647 Jps 解决方案 # 进入/tmp查看 [root@slave1 dfs]# cd /tmp [root@slave1 tmp]# ll 总用量 48 drwxr-xr-x. 2
报错1 hive> show databases; OK Failed with exception java.io.IOException:java.lang.RuntimeException: Error in configuring object Time taken: 0.474 se
报错1 [root@localhost ~]# vim -bash: vim: 未找到命令 安装vim yum -y install vim* # 查看是否安装成功 [root@hadoop01 hadoop]# rpm -qa |grep vim vim-X11-7.4.629-8.el7_9.x
修改hadoop配置 vi /usr/local/software/hadoop-2.9.2/etc/hadoop/yarn-site.xml # 添加如下 <configuration> <property> <name>yarn.nodemanager.res