微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

ruby – 如何在kiba etl脚本(kiba gem)中进行聚合转换?

我想写一个Kiba Etl脚本,它有一个从CSV到目标CSV的源,带有一个转换规则列表,其中第二个转换器是一个聚合,其中的操作如选择名称,sum(euro)group by name

Kiba ETL脚本文件

source CsvSource,'users.csv',col_sep: ';',headers: true,header_converters: :symbol

transform VerifyFieldsPresence,[:name,:euro]

transform AggregateFields,{ sum: :euro,group_by: :name}

transform RenameField,from: :euro,to: :total_amount

destination CsvDestination,'result.csv',:total_amount]

users.csv

date;euro;name
7/3/2015;10;Jack
7/3/2015;85;Jill
8/3/2015;6;Jack
8/3/2015;12;Jill
9/3/2015;99;Mack

result.csv(预期结果)

total_amount;name
16;Jack
97;Jill
99;Mack

由于etl变换器一次在一行上一个一个地执行,但我的第二个变换器行为依赖于我无法在传递给transform方法的类中访问它的整个行集合.

transform AggregateFields,group_by: :name }

是否有可能使用kiba gem实现这种行为
先感谢您

解决方法

木巴作者在这里!您可以通过多种方式实现这一目标,主要取决于数据大小和实际需求.这里有几种可能性.

使用Kiba脚本中的变量进行聚合

require 'awesome_print'

transform do |r|
  r[:amount] = BigDecimal.new(r[:amount])
  r
end

total_amounts = Hash.new(0)

transform do |r|
  total_amounts[r[:name]] += r[:amount]
  r
end

post_process do
  # pretty print here,but you Could save to a CSV too
  ap total_amounts
end

这是最简单的方法,但这非常灵活.

它会将你的聚合内容保存在内存中,所以这可能是好的或不是,取决于你的场景.请注意,目前Kiba是单线程的(但“Kiba Pro”将是多线程的),因此现在无需为聚合添加锁或使用线程安全结构.

从post_process块调用TextQL

另一种快速简便的聚合方法是首先生成非聚合的CSV文件,然后利用TextQl实际进行聚合,如下所示:

destination CsvSource,'non-aggregated-output.csv',:amount]

post_process do
  query = <<sql
    select
      name,/* apparently sqlite has reduced precision,round to 2 for Now */
      round(sum(amount),2) as total_amount
    from tbl group by name
sql

  textql('non-aggregated-output.csv',query,'aggregated-output.csv')
end

定义了以下助手:

def system!(cmd)
  raise "Failed to run command #{command}" unless system(command)
end

def textql(source_file,output_file)
  system! "cat #{source_file} | textql -header -output-header=true -sql \"#{query}\" > #{output_file}"
  # this one uses csvfix to pretty print the table
  system! "cat #{output_file} | csvfix ascii_table"
end

在进行计算时要小心精度.

编写内存中的聚合目标

这里可以使用的一个有用的技巧是用一个类包装一个给定的目标来进行聚合.这是它的样子:

class InMemoryAggregate
  def initialize(sum:,group_by:,destination:)
    @aggregate = Hash.new(0)
    @sum = sum
    @group_by = group_by
    # this relies a bit on the internals of Kiba,but not too much
    @destination = destination.shift.new(*destination)
  end

  def write(row)
    # do not write,but count here instead
    @aggregate[row[@group_by]] += row[@sum]
  end

  def close
    # use close to actually do the writing
    @aggregate.each do |k,v|
      # reformat BigDecimal additions here
      value = '%0.2f' % v
      @destination.write(@group_by => k,@sum => value)
    end
    @destination.close
  end
end

你可以用这种方式:

# convert your string into an actual number
transform do |r|
  r[:amount] = BigDecimal.new(r[:amount])
  r
end

destination CsvDestination,'non-aggregated.csv',:amount]

destination InMemoryAggregate,sum: :amount,group_by: :name,destination: [
    CsvDestination,'aggregated.csv',:amount]
  ]

post_process do
  system!("cat aggregated.csv | csvfix ascii_table")
end

这个版本的好处是你可以将聚合器重用于不同的目的地(比如数据库或其他任何目的地).

请注意,这会将所有聚合保留在内存中,就像第一个版本一样.

插入具有聚合功能的商店

另一种方法(如果你有非常大的卷,特别有用)是将结果数据发送到能够为你聚合数据的东西.它可以是一个常规的sql数据库,Redis,或任何更奇特的东西,然后您可以根据需要进行查询.

正如我所说,实施将在很大程度上取决于您的实际需求.希望你能找到适合你的东西!

原文地址:https://www.jb51.cc/ruby/264665.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐