微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何处理代码存储库中的大文件?

如何解决如何处理代码存储库中的大文件?

我有一个数据馈送,每天提供一个大型 .txt 文件 (50-75GB)。该文件包含几个不同的模式,其中每一行对应一个模式。我想将其拆分为每个模式的分区数据集,我该如何有效地做到这一点?

解决方法

您需要解决的最大问题是恢复架构的迭代速度,这对于这种规模的文件来说可能具有挑战性。

您最好的策略是获取一个示例“概念”文件,其中包含您要恢复的每个模式作为其中的一行,并将其添加为存储库中的文件。当您将此文件添加到您的存储库中(与您的转换逻辑一起)时,您就可以将其推送到数据帧中,就像使用数据集中的原始文件一样,以进行快速测试迭代。

首先,确保将 txt 文件指定为包内容的一部分,这样您的测试就会发现它们(这在 Read a file from a Python repository 下的文档中有所介绍):

您可以将存储库中的其他文件读取到转换上下文中。这可能有助于为您的转换代码设置参数以供参考。

首先,在你的 python 存储库中编辑 setup.py:

setup(
   name=os.environ['PKG_NAME'],# ...
    package_data={
        '': ['*.txt']
    }
)

我正在使用包含以下内容的 txt 文件:

my_column,my_other_column
some_string,some_other_string
some_thing,some_other_thing,some_final_thing

此文本文件位于我的存储库中的以下路径:transforms-python/src/myproject/datasets/raw.txt

将文本文件配置为随逻辑一起提供后,将文件本身包含在存储库中后,您就可以包含以下代码。这段代码有几个重要的功能:

  1. 它将原始文件解析逻辑与将文件读入 Spark DataFrame 的阶段完全分开。这是为了方式可以将构建此 DataFrame 的方式留给测试基础架构或运行时,具体取决于您运行的位置。
  2. 这种保持逻辑分离的方式可以确保您想要进行的实际逐行解析是它自己的可测试函数,而不是让它完全存在于您的 my_compute_function
  3. 此代码使用 Spark 原生的 spark_session.read.text 方法,这将比逐行解析原始 txt 文件快几个数量级。这将确保并行化的 DataFrame 是您操作的对象,而不是在您的执行程序(或更糟的是,您的驱动程序)中逐行处理的单个文件。
from transforms.api import transform,Input,Output
from pkg_resources import resource_filename


def raw_parsing_logic(raw_df):
    return raw_df


@transform(
    my_output=Output("/txt_tests/parsed_files"),my_input=Input("/txt_tests/dataset_of_files"),)
def my_compute_function(my_input,my_output,ctx):
    all_files_df = None
    for file_status in my_input.filesystem().ls('**/**'):
        raw_df = ctx.spark_session.read.text(my_input.filesystem().hadoop_path + "/" + file_status.path)
        parsed_df = raw_parsing_logic(raw_df)
        all_files_df = parsed_df if all_files_df is None else all_files_df.unionByName(parsed_df)
    my_output.write_dataframe(all_files_df)


def test_my_compute_function(spark_session):
    file_path = resource_filename(__name__,"raw.txt")
    raw_df = raw_parsing_logic(
      spark_session.read.text(file_path)
    )
    assert raw_df.count() > 0
    raw_columns_set = set(raw_df.columns)
    expected_columns_set = {"value"}
    assert len(raw_columns_set.intersection(expected_columns_set)) == 1

一旦您启动并运行此代码,您的 test_my_compute_function 方法将非常快速迭代,以便您可以完善架构恢复逻辑。这将使您在最后构建数据集变得更加容易,而且不会产生完整构建的任何开销。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。