微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

【MapReduce】扩展案例 ---- 倒排索引多job串联


文章目录


倒排索引案例(多job串联)


倒排索引介绍

在这里插入图片描述

倒排索引(英语:Inverted index),也常被称为反向索引、置入档案或反向档案,是一种索引方法,被用来存储在全文搜索下某个单词在一个文档或者一组文档中的存储位置的映射。它是文档检索系统中最常用的数据结构。
有两种不同的反向索引形式:

  • 一条记录的水平反向索引(或者反向档案索引)包含每个引用单词的文档的列表
  • 一个单词的水平反向索引(或者完全反向索引)又包含每个单词在一个文档中的位置。
    后者的形式提供了更多的兼容性(比如短语搜索),但是需要更多的时间和空间来创建

参见大佬博文:你有想过,如何用Hadoop实现【倒排索引】?

返回顶部


1. 需求

有大量的文本(文档、网页),需要建立搜索索引,如图所示。

(1)数据输入

在这里插入图片描述

(2)期望输出数据

atguigu	c.txt-->2	b.txt-->2	a.txt-->3	
pingping	c.txt-->1	b.txt-->3	a.txt-->1	
ss	c.txt-->1	b.txt-->1	a.txt-->2	

返回顶部


2. 需求分析

在这里插入图片描述

返回顶部


3. 代码实现

第一次处理 ---- job1

Mapper阶段

package 第七章_MR扩展案例.多job串联;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;

public class OneIndexMapper extends Mapper<LongWritable, Text,Text, IntWritable> {

    Text k = new Text();
    IntWritable v = new IntWritable();
    String name;
    /**
     * 重写setup()获取文件名
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        // 1.获取文件名称
        FileSplit inputSplit = (FileSplit) context.getInputSplit();
        name = inputSplit.getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // atguigu pingping
        // 1.读取一行数据
        String line = value.toString();
        // 2.切分
        String[] fields = line.split(" ");
        // 3.追加写出
        for (String field:fields){
            field += "--"+ name;
            k.set(field);
            v.set(1);
            context.write(k,v);
        }
    }
}

在这里插入图片描述

返回顶部


Reducer阶段

package 第七章_MR扩展案例.多job串联;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class OneIndexReducer extends Reducer <Text, IntWritable,Text,IntWritable> {
    IntWritable v = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        // 1.遍历统计
        int sum = 0;
        for (IntWritable value:values){
            sum += value.get();
        }
        v.set(sum);
        // 2.写出
        context.write(key,v);
    }
}

返回顶部


Driver阶段

package 第七章_MR扩展案例.多job串联;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import 第一章_MR概述.wcd;
import 第一章_MR概述.wcm;
import 第一章_MR概述.wcr;

public class OneIndexDriver {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        Job job = null;
        try {
            // 1.获取job对象
            job = Job.getInstance(conf);
            // 2.类的关联
            job.setMapperClass(OneIndexMapper.class);
            job.setReducerClass(OneIndexReducer.class);
            job.setJarByClass(OneIndexDriver.class);
            // 3.定义M、R阶段输出数据类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setoutputKeyClass(Text.class);
            job.setoutputValueClass(IntWritable.class);
            // 4.指定输入输出路径
            FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第七章_MR扩展案例\\data"));
            FileOutputFormat.setoutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第七章_MR扩展案例\\output"));
            // 5.提交job
            job.waitForCompletion(true);
        } catch (Exception e){
            e.printstacktrace();
        }
    }
}

返回顶部


第二次处理 ---- job2

Mapper阶段

package 第七章_MR扩展案例.多job串联;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class TwoIndexMapper extends Mapper<LongWritable, Text,Text,Text> {

    Text k = new Text();
    Text v = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // atguigu--a.txt	3
        // 1.读取一行数据
        String line = value.toString();
        // 2.拆分
        String[] fields = line.split("--");
        String[] parts = fields[1].split("\t");
        // 3.拼接
        String combine = parts[0]+"-->"+parts[1];
        // 4.写出
        k.set(fields[0]);
        v.set(combine);
        context.write(k,v);
    }
}

返回顶部


Reducer阶段

package 第七章_MR扩展案例.多job串联;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class TwoIndexReducer extends Reducer<Text,Text,Text,Text> {
    Text v = new Text();

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        // 1.获取遍历拼接
        String result = "";
        for (Text value:values){
            result += value + "\t";
        }
        v.set(result);
        // 2.输出
        context.write(key,v);
    }
}

返回顶部


Driver阶段

package 第七章_MR扩展案例.多job串联;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TwoIndexDriver {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        Job job = null;
        try {
            // 1.获取job对象
            job = Job.getInstance(conf);
            // 2.类的关联
            job.setMapperClass(TwoIndexMapper.class);
            job.setReducerClass(TwoIndexReducer.class);
            job.setJarByClass(TwoIndexDriver.class);
            // 3.定义M、R阶段输出数据类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setoutputKeyClass(Text.class);
            job.setoutputValueClass(Text.class);
            // 4.指定输入输出路径
            FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第七章_MR扩展案例\\output\\part-r-00000"));
            FileOutputFormat.setoutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第七章_MR扩展案例\\output1"));
            // 5.提交job
            job.waitForCompletion(true);
        } catch (Exception e){
            e.printstacktrace();
        }
    }
}

在这里插入图片描述


Mapper\Reducer阶段做法二:

在这里插入图片描述


方法相对本人的优化点在于,拼接的时候同时使用replace()方法转换形式,少了一次拆分。主要区别在于Mapper阶段的处理,本人选择先处理好 ‘ --> ’ 样式。

返回顶部


版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐