微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

我如何迭代代码存储库中的 json 文件并增量附加到数据集

如何解决我如何迭代代码存储库中的 json 文件并增量附加到数据集

我已经通过数据连接将一个包含 100,000 个约 100GB 的原始 json 文件的数据集导入到 Foundry 中。我想使用 Python Transforms raw file access 转换来读取文件,将结构和结构的数组展平到数据帧中,作为对 df 的增量更新。 我想使用 *.json 文件文档中的以下示例中的内容,并将其转换为使用 @incremental() 装饰器更新的增量。

>>> import csv
>>> from pyspark.sql import Row
>>> from transforms.api import transform,Input,Output
>>>
>>> @transform(
...     processed=Output('/examples/hair_eye_color_processed'),...     hair_eye_color=Input('/examples/students_hair_eye_color_csv'),... )
... def example_computation(hair_eye_color,processed):
...
...    def process_file(file_status):
...        with hair_eye_color.filesystem().open(file_status.path) as f:
...            r = csv.reader(f)
...
...            # Construct a pyspark.Row from our header row
...            header = next(r)
...            MyRow = Row(*header)
...
...            for row in csv.reader(f):
...                yield MyRow(*row)
...
...    files_df = hair_eye_color.filesystem().files('**/*.csv')
...    processed_df = files_df.rdd.flatMap(process_file).toDF()
...    processed.write_dataframe(processed_df)

在@Jeremy David Gamet 的帮助下,我能够开发代码获取我想要的数据集。

from transforms.api import transform,Output
from  pyspark import *
import json


@transform(
     out=Output('foundry/outputdataset'),inpt=Input('foundry/inputdataset'),)
def update_set(ctx,inpt,out):
    spark = ctx.spark_session
    sc = spark.sparkContext

    filesystem = list(inpt.filesystem().ls())
    file_dates = []
    for files in filesystem:
        with inpt.filesystem().open(files.path,'r',encoding='utf-8-sig') as fi:
            data = json.load(fi)
        file_dates.append(data)

    json_object = json.dumps(file_dates)
    df_2 = spark.read.option("multiline","true").json(sc.parallelize([json_object]))

    df_2.drop_duplicates()
# this code to [Flatten array column][1]
    df_2 = flatten(df_2)
    out.write_dataframe(df_2)

代码flatten__df

上述代码适用于少数文件,因为文件超过 100,0000,我遇到以下错误

Connection To Driver Lost 

This error indicates that connection to the driver was lost unexpectedly,which is often caused by the driver being terminated due to running out of memory. Common reasons for driver out-of-memory (OOM) errors include functions that materialize data to the driver such as .collect(),broadcasted joins,and using Pandas dataframes.

有没有办法解决这个问题?

解决方法

我已经举了一个例子,说明如何动态地做到这一点,作为对另一个问题的回答。

这是该代码答案的链接:How to union multiple dynamic inputs in Palantir Foundry? 和相同代码的副本:

from transforms.api import Input,Output,transform
from pyspark.sql import functions as F
import json
import logging


def transform_generator():
    transforms = []
    transf_dict = {## enter your dynamic mappings here ##}

    for value in transf_dict:
        @transform(
            out=Output(' path to your output here '.format(val=value)),inpt=Input(" path to input here ".format(val=value)),)
        def update_set(ctx,inpt,out):
            spark = ctx.spark_session
            sc = spark.sparkContext

            filesystem = list(inpt.filesystem().ls())
            file_dates = []
            for files in filesystem:
                with inpt.filesystem().open(files.path) as fi:
                    data = json.load(fi)
                file_dates.append(data)

            logging.info('info logs:')
            logging.info(file_dates)
            json_object = json.dumps(file_dates)
            df_2 = spark.read.option("multiline","true").json(sc.parallelize([json_object]))
            df_2 = df_2.withColumn('upload_date',F.current_date())

            df_2.drop_duplicates()
            out.write_dataframe(df_2)
        transforms.append(update_logs)
    return transforms


TRANSFORMS = transform_generator()

如果有什么我可以澄清的,请告诉我。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。