如何解决无法使用 PySpark 在地图缩减功能中增加累加器
这是我试图实现的 mapReduce 算法的摘录,我需要在我的 reducefunctioniterate 内增加累加器计数器,但我一直无法做到。应该怎么做?谢谢
counter = spark.sparkContext.accumulator(0)
def iterate1(pairs):
counter.value = 0
double_pairs = pairs.flatMap(lambda pair: ((pair[0],pair[1]),(pair[1],pair[0])))
adj = double_pairs.groupByKey().map(lambda x: (x[0],list(x[1])))
red = adj.flatMap(lambda pair: reducefunctioniterate(pair))
return red
def reducefunctioniterate(pair):
key,values = pair
valuelist = []
for_output = []
mini = key
for adj_node in values:
if adj_node < mini:
mini = adj_node
valuelist.append(adj_node)
if mini < key:
for_output.append([key,mini])
for val in valuelist:
if mini != val:
counter.add(1)
for_output.append([val,mini])
return for_output
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。