如何解决如何在hadoop中组合两个独立映射器的结果?
我想合并两个单独的映射器的结果,然后对合并的结果执行reducer。
我有两个文件。第一个文件包含以下列:A、B、C。第二个文件:A、D。
现在,两个映射器具有相同的签名:Mapper<LongWritable,Text,LongWritable,Text>
。如果满足特定条件,则第一个映射器的输出为 KEY: new LongWritable(A)
和 VALUE: new Text(B,C)
。如果满足另一个条件,则第二个 KEY: new LongWritable(A)
和 VALUE: new Text(D)
的输出。
现在,当我在减速器中从 Iterable<Text>
输出值时,我获得 B+C 或 D。
鉴于两个集合有交集,我如何在 reducer 中获得给定 A 的 B、C、D?
解决方法
我有类似的用例,我通过向一个映射器添加一个令牌来解决这个问题,以便了解该记录来自 reducer 中的哪个文件(fileA 或 fileB),然后将它们分开。
假设 fileA
是这样的:
A B C
C D D
A D D
A X Y
和 fileB
就像:
A ALICE
C BOB
A ALICE
A BOB
我这样写映射器(看,我在每个值的开头添加了一个美元符号,我在 reducer 中使用了这个美元符号):
public static class FileAMapper extends Mapper<LongWritable,Text,Text> {
private String specifier = "$";
public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException {
String line = value.toString();
String[] parts = line.split(" ");
String k = parts[0];
String v = specifier + parts[1] + " " + parts[2];
context.write(new Text(k),new Text(v));
}
}
下一个映射器:
public static class FileBMapper extends Mapper<LongWritable,Text> {
public void map(LongWritable key,InterruptedException {
String line = value.toString();
String[] parts = line.split(" ");
String k = parts[0];
String v = parts[1];
context.write(new Text(k),new Text(v));
}
}
现在是reducer:我定义了两个数组列表,以便根据我在映射器中使用的美元符号来分隔每个值
protected void reduce(Text key,Iterable<Text> values,InterruptedException {
List<String> left = new ArrayList<>();
List<String> right = new ArrayList<>();
values.forEach((e) -> {
String temp = e.toString();
if (temp.startsWith("$")) {
left.add(temp.substring(1));
} else {
right.add(temp);
}
});
left.forEach(l ->
right.forEach(r ->
System.out.println(String.format("%s %s %s",key.toString(),l,r))));
}
}
结果:
A B C ALICE
A B C ALICE
A B C BOB
A D D ALICE
A D D ALICE
A D D BOB
A X Y ALICE
A X Y ALICE
A X Y BOB
C D D BOB
司机:
Job job = new Job(new Configuration());
job.setJarByClass(Main.class);
Path fileA = new Path("input/fileA");
Path fileB = new Path("input/fileB");
Path outputPath = new Path("output");
MultipleInputs.addInputPath(job,fileA,TextInputFormat.class,FileAMapper.class);
MultipleInputs.addInputPath(job,fileB,FileBMapper.class);
FileOutputFormat.setOutputPath(job,outputPath);
job.setMapOutputKeyClass(Text.class);
job.setReducerClass(JoinReducer.class);
job.waitForCompletion(true);
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。