微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

将带参数的python函数应用于Apache Beam管道

如何解决将带参数的python函数应用于Apache Beam管道

我正在使用apache beam对下面显示的数据样本进行python数据预处理

userid   itemid  rating timestamp
1          2       3.2    10:59
1          3       3.5    11:59
2          3       4.2    10:10
2          4       1.5    10:59 

我的代码如下所示

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

beam.__version__



# delete timestamp column
def del_col(data,col_name: str):
    del data[col_name]
    return data
  
# check for null values
def check_null(data,col_name1:str,col_name2:str,col_name3:str):
    return len(data[col_name1]) > 0 and len(data[col_name2]) > 0 and len(data[col_name3]) > 0

# converting to comma deliminated format
def format_data(data):
  data = ','.join([data['userid'],data['itemid'],data['ratings']])
  return data

def print_row(data):
  print(data)


if __name__ == '__main__':
  options = PipelineOptions()
  input_file = 'data.csv' 
  with beam.Pipeline(options=options) as pipeline:
    (pipeline | 'ReadData' >> beam.io.ReadFromText(input_file,skip_header_lines=0) # read data with beam
        | 'SplitData' >> beam.Map(lambda x: x.split(','))
        | 'FormatToDict' >> beam.Map(lambda x: {"userid": x[0],"itemid": x[1],"ratings": x[2],"timestamp": x[3]}) # format to dict and name columns
        | 'DeleteNullData' >> beam.Filter(check_null('userid','itemid','ratings')) # pick non null columns
        | 'DeleteUnwantedData' >> beam.Map(del_col(col_name='timestamp')) # delete irrelevant columns
        | 'FormatData' >> beam.Map(format_data)

这里的问题是我需要使这些功能模块化,而且我不知道如何开发这些功能

def check_null(data,data['ratings']])
  return data

此外,我如何将函数与参数一起传递给apache光束管道,我尝试了一下,但由于无法获取数据参数而无法正常工作

| 'SplitData' >> beam.Map(lambda x: x.split(','ratings')) # pick non null columns
        | 'DeleteUnwantedData' >> beam.Map(del_col(col_name='timestamp')) # delete irrelevant columns

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。