微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何在hadoop中组合两个独立映射器的结果?

如何解决如何在hadoop中组合两个独立映射器的结果?

我想合并两个单独的映射器的结果,然后对合并的结果执行reducer。

我有两个文件。第一个文件包含以下列:A、B、C。第二个文件:A、D。 现在,两个映射器具有相同的签名:Mapper<LongWritable,Text,LongWritable,Text>。如果满足特定条件,则第一个映射器的输出KEY: new LongWritable(A)VALUE: new Text(B,C)。如果满足另一个条件,则第二个 KEY: new LongWritable(A)VALUE: new Text(D)输出

现在,当我在减速器中从 Iterable<Text> 输出值时,我获得 B+C D。 鉴于两个集合有交集,我如何在 reducer 中获得给定 A 的 B、C、D?

解决方法

我有类似的用例,我通过向一个映射器添加一个令牌来解决这个问题,以便了解该记录来自 reducer 中的哪个文件(fileA 或 fileB),然后将它们分开。

假设 fileA 是这样的:

A B C
C D D
A D D
A X Y

fileB 就像:

A ALICE
C BOB
A ALICE
A BOB

我这样写映射器(看,我在每个值的开头添加了一个美元符号,我在 reducer 中使用了这个美元符号):

public static class FileAMapper extends Mapper<LongWritable,Text,Text> {

        private String specifier = "$";
        public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException {
                String line = value.toString();
                String[] parts = line.split(" ");
                String k = parts[0];
                String v = specifier + parts[1] + " " + parts[2];
                context.write(new Text(k),new Text(v));
            }
        }

下一个映射器:

public static class FileBMapper extends Mapper<LongWritable,Text> {


    public void map(LongWritable key,InterruptedException {
        String line = value.toString();
        String[] parts = line.split(" ");
        String k = parts[0];
        String v = parts[1];
        context.write(new Text(k),new Text(v));
    }
}

现在是reducer:我定义了两个数组列表,以便根据我在映射器中使用的美元符号来分隔每个值

protected void reduce(Text key,Iterable<Text> values,InterruptedException {
            List<String> left = new ArrayList<>();
            List<String> right = new ArrayList<>();
            values.forEach((e) -> {
                String temp = e.toString();
                if (temp.startsWith("$")) {
                    left.add(temp.substring(1));
                } else {
                    right.add(temp);
                }
            });


            left.forEach(l -> 
                    right.forEach(r -> 
                            System.out.println(String.format("%s %s %s",key.toString(),l,r))));
        }
    }

结果:

A B C ALICE
A B C ALICE
A B C BOB
A D D ALICE
A D D ALICE
A D D BOB
A X Y ALICE
A X Y ALICE
A X Y BOB
C D D BOB

司机:

Job job = new Job(new Configuration());
job.setJarByClass(Main.class);

Path fileA = new Path("input/fileA");
Path fileB = new Path("input/fileB");
Path outputPath =   new Path("output");

MultipleInputs.addInputPath(job,fileA,TextInputFormat.class,FileAMapper.class);
MultipleInputs.addInputPath(job,fileB,FileBMapper.class);
FileOutputFormat.setOutputPath(job,outputPath);
job.setMapOutputKeyClass(Text.class);
job.setReducerClass(JoinReducer.class);
job.waitForCompletion(true);

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。