如何在 Palantir Foundry 中联合多个动态输入?

如何解决如何在 Palantir Foundry 中联合多个动态输入?

我想在 Palantir Foundry 中联合多个数据集,数据集的名称是动态的,因此我无法静态地在 transforms_df(..) 中给出数据集名称。有没有一种方法可以在 transforms_df 中动态获取多个输入并联合所有这些数据帧?

我尝试遍历数据集,例如:

li = ['dataset1_path','dataset2_path']

union_df = None
for p in li:
  @transforms_df(
    my_input = Input(p),Output(p+"_output")
  )

  def my_compute_function(my_input):
    return my_input

  if union_df is None:
    union_df = my_compute_function
  else:
    union_df = union_df.union(my_compute_function)

但是,这不会生成联合输出

如有任何帮助,将不胜感激,谢谢。

解决方法

通过一些更改,这应该可以为您工作,这是一个带有 json 文件的动态数据集的示例,您的情况可能只有一点不同。这是您可以执行动态 json 输入数据集的通用方法,该数据集应该适用于您可以指定的任何类型的动态输入文件类型或代工厂内部数据集。这个通用示例正在处理上传到平台中数据集节点的一组 json 文件。这应该是完全动态的。在这之后做工会应该是一件简单的事情。

这里也有一些奖励日志记录。

希望能帮到你

from transforms.api import Input,Output,transform
from pyspark.sql import functions as F
import json
import logging


def transform_generator():
    transforms = []
    transf_dict = {## enter your dynamic mappings here ##}

    for value in transf_dict:
        @transform(
            out=Output(' path to your output here '.format(val=value)),inpt=Input(" path to input here ".format(val=value)),)
        def update_set(ctx,inpt,out):
            spark = ctx.spark_session
            sc = spark.sparkContext

            filesystem = list(inpt.filesystem().ls())
            file_dates = []
            for files in filesystem:
                with inpt.filesystem().open(files.path) as fi:
                    data = json.load(fi)
                file_dates.append(data)

            logging.info('info logs:')
            logging.info(file_dates)
            json_object = json.dumps(file_dates)
            df_2 = spark.read.option("multiline","true").json(sc.parallelize([json_object]))
            df_2 = df_2.withColumn('upload_date',F.current_date())

            df_2.drop_duplicates()
            out.write_dataframe(df_2)
        transforms.append(update_logs)
    return transforms


TRANSFORMS = transform_generator()
,

所以这个问题分为两个问题。

如何使用程序化输入路径处理转换

要处理程序化输入的转换,重要的是要记住两件事:

1st - Transforms 将在 CI 时间确定您的输入和输出。这意味着您可以拥有生成转换的 Python 代码,但您无法从数据集中读取路径,需要将它们硬编码到生成转换的 Python 代码中。

2nd - 您的转换将在 CI 执行期间创建一次。这意味着无论何时构建数据集,您都无法使用增量或特殊逻辑来生成不同的路径。

有了这两个前提,就像你的例子或@jeremy-david-gamet's(回复的ty,给你一个+1),你可以拥有在CI时生成路径的python代码。

dataset_paths = ['dataset1_path','dataset2_path']

for path in dataset_paths:
  @transforms_df(
    my_input = Input(path),Output(f"{path}_output")
  )
  def my_compute_function(my_input):
    return my_input

但是要合并它们,您需要第二次转换来执行合并,您需要传递多个输入,因此您可以为此使用 *args**kwargs

dataset_paths = ['dataset1_path','dataset2_path']

all_args = [Input(path) for path in dataset_paths]
all_args.append(Output("path/to/unioned_dataset"))
@transforms_df(*all_args)
def my_compute_function(*args):
    input_dfs = []
    for arg in args:
       # there are other arguments like ctx in the args list,so we need  to check for type. You can also use kwargs for more determinism.
       if isinstance(arg,pyspark.sql.DataFrame):
            input_dfs.append(arg)
    
    # now that you have your dfs in a list you can union them
    # Note I didn't test this code,but it should be something like this
    ...

如何合并具有不同模式的数据集。

对于这一部分,有很多关于如何在 spark 中合并不同数据帧的问答。这是从 https://stackoverflow.com/a/55461824/26004

复制的简短代码示例
from pyspark.sql import SparkSession,HiveContext
from pyspark.sql.functions import lit
from pyspark.sql import Row

def customUnion(df1,df2):
    cols1 = df1.columns
    cols2 = df2.columns
    total_cols = sorted(cols1 + list(set(cols2) - set(cols1)))
    def expr(mycols,allcols):
        def processCols(colname):
            if colname in mycols:
                return colname
            else:
                return lit(None).alias(colname)
        cols = map(processCols,allcols)
        return list(cols)
    appended = df1.select(expr(cols1,total_cols)).union(df2.select(expr(cols2,total_cols)))
    return appended
,

由于输入和输出是在 CI 时间确定的,我们无法形成真正的动态输入。我们将不得不以某种方式指向代码中的特定数据集。假设数据集的路径共享相同的根,以下似乎需要最少的维护:

from transforms.api import transform_df,Input,Output
from functools import reduce


datasets = [
    'dataset1','dataset2','dataset3',]
inputs = {f'inp{i}': Input(f'input/folder/path/{x}') for i,x in enumerate(datasets)}
kwargs = {
    **{'output': Output('output/folder/path/unioned_dataset')},**inputs
}


@transform_df(**kwargs)
def my_compute_function(**inputs):
    unioned_df = reduce(lambda df1,df2: df1.unionByName(df2),inputs.values())
    return unioned_df

关于不同模式的联合,since Spark 3.1 one can use this

df1.unionByName(df2,allowMissingColumns=True)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?