微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

无法使用 PySpark 在地图缩减功能中增加累加器

如何解决无法使用 PySpark 在地图缩减功能中增加累加器

这是我试图实现的 mapReduce 算法的摘录,我需要在我的 reducefunctioniterate 内增加累加器计数器,但我一直无法做到。应该怎么做?谢谢

counter = spark.sparkContext.accumulator(0)

def iterate1(pairs):
  counter.value = 0
  double_pairs = pairs.flatMap(lambda pair: ((pair[0],pair[1]),(pair[1],pair[0])))
  adj = double_pairs.groupByKey().map(lambda x: (x[0],list(x[1])))
  red = adj.flatMap(lambda pair: reducefunctioniterate(pair))
  return red

def reducefunctioniterate(pair):
  key,values = pair
  valuelist = []
  for_output = []
  mini = key
  for adj_node in values:
    if adj_node < mini:
      mini = adj_node
    valuelist.append(adj_node)
  if mini < key:
    for_output.append([key,mini])
    for val in valuelist:
      if mini != val:
        counter.add(1)
        for_output.append([val,mini])
  return for_output

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。