微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在一个 rdd 中执行和存储各种聚合

如何解决在一个 rdd 中执行和存储各种聚合

我有这样的数据:

[('a',110),('a',130),120),('b',200),206)]

我想对键进行分组并对值执行计数、平均值、最小值和最大值以获得以下结果:

[('a',3,120,110,2,203,200,206)]

我大致知道如何使用 countByKey() 和 reduceByKey() 自己完成每个聚合,但我不确定如何将它们全部包含在一个 RDD 中。有什么帮助吗?

编辑:这是我真实rdd的片段

Out[16]: [('Alaska Airlines Inc.',17.0),('Alaska Airlines Inc.',63.0),70.0),16.0),('United Airlines',9.0),197.0),115.0),6.0),1.0),

解决方法

好吧,我设法通过使用 aggregateByKey 函数和 map 返回所需的“架构”来获得您的解决方案:

data = sc.parallelize([('a',110),('a',120),130),('b',200),206)])


def sequence_operator(accumulator,element):
  return (accumulator[0] + 1,accumulator[1] + element,min(accumulator[2],element),max(accumulator[3],element))


def combination_operator(current_accumulator,next_accumulator):
  return (current_accumulator[0] + next_accumulator[0],current_accumulator[1] + next_accumulator[1],min(current_accumulator[2],next_accumulator[2]),max(current_accumulator[3],next_accumulator[3]))


def unpack_aggregations(data):
  key = data[0]
  count,total,minimum,maximum = data[1]
  return key,count,total / count,maximum


aggregations = data.aggregateByKey(zeroValue=(0,float('inf'),float('-inf')),seqFunc=sequence_operator,combFunc=combination_operator)
mapped_data = aggregations.map(unpack_aggregations)
print(mapped_data.collect())

输出

[('a',3,120.0,110,2,203.0,200,206)]

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。