文章目录
倒排索引案例(多job串联)
倒排索引介绍
倒排索引(英语:Inverted index),也常被称为反向索引、置入档案或反向档案,是一种索引方法,被用来存储在全文搜索下某个单词在一个文档或者一组文档中的存储位置的映射。它是文档检索系统中最常用的数据结构。
有两种不同的反向索引形式:
- 一条记录的水平反向索引(或者反向档案索引)包含每个引用单词的文档的列表。
- 一个单词的水平反向索引(或者完全反向索引)又包含每个单词在一个文档中的位置。
后者的形式提供了更多的兼容性(比如短语搜索),但是需要更多的时间和空间来创建。
参见大佬博文:你有想过,如何用Hadoop实现【倒排索引】?
1. 需求
有大量的文本(文档、网页),需要建立搜索索引,如图所示。
(1)数据输入
(2)期望输出数据
atguigu c.txt-->2 b.txt-->2 a.txt-->3
pingping c.txt-->1 b.txt-->3 a.txt-->1
ss c.txt-->1 b.txt-->1 a.txt-->2
2. 需求分析
3. 代码实现
第一次处理 ---- job1
Mapper阶段
package 第七章_MR扩展案例.多job串联;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class OneIndexMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
Text k = new Text();
IntWritable v = new IntWritable();
String name;
/**
* 重写setup()获取文件名
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// 1.获取文件名称
FileSplit inputSplit = (FileSplit) context.getInputSplit();
name = inputSplit.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// atguigu pingping
// 1.读取一行数据
String line = value.toString();
// 2.切分
String[] fields = line.split(" ");
// 3.追加写出
for (String field:fields){
field += "--"+ name;
k.set(field);
v.set(1);
context.write(k,v);
}
}
}
Reducer阶段
package 第七章_MR扩展案例.多job串联;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class OneIndexReducer extends Reducer <Text, IntWritable,Text,IntWritable> {
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 1.遍历统计
int sum = 0;
for (IntWritable value:values){
sum += value.get();
}
v.set(sum);
// 2.写出
context.write(key,v);
}
}
Driver阶段
package 第七章_MR扩展案例.多job串联;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import 第一章_MR概述.wcd;
import 第一章_MR概述.wcm;
import 第一章_MR概述.wcr;
public class OneIndexDriver {
public static void main(String[] args) {
Configuration conf = new Configuration();
Job job = null;
try {
// 1.获取job对象
job = Job.getInstance(conf);
// 2.类的关联
job.setMapperClass(OneIndexMapper.class);
job.setReducerClass(OneIndexReducer.class);
job.setJarByClass(OneIndexDriver.class);
// 3.定义M、R阶段输出数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setoutputKeyClass(Text.class);
job.setoutputValueClass(IntWritable.class);
// 4.指定输入输出路径
FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第七章_MR扩展案例\\data"));
FileOutputFormat.setoutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第七章_MR扩展案例\\output"));
// 5.提交job
job.waitForCompletion(true);
} catch (Exception e){
e.printstacktrace();
}
}
}
第二次处理 ---- job2
Mapper阶段
package 第七章_MR扩展案例.多job串联;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class TwoIndexMapper extends Mapper<LongWritable, Text,Text,Text> {
Text k = new Text();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// atguigu--a.txt 3
// 1.读取一行数据
String line = value.toString();
// 2.拆分
String[] fields = line.split("--");
String[] parts = fields[1].split("\t");
// 3.拼接
String combine = parts[0]+"-->"+parts[1];
// 4.写出
k.set(fields[0]);
v.set(combine);
context.write(k,v);
}
}
Reducer阶段
package 第七章_MR扩展案例.多job串联;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class TwoIndexReducer extends Reducer<Text,Text,Text,Text> {
Text v = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 1.获取遍历拼接
String result = "";
for (Text value:values){
result += value + "\t";
}
v.set(result);
// 2.输出
context.write(key,v);
}
}
Driver阶段
package 第七章_MR扩展案例.多job串联;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TwoIndexDriver {
public static void main(String[] args) {
Configuration conf = new Configuration();
Job job = null;
try {
// 1.获取job对象
job = Job.getInstance(conf);
// 2.类的关联
job.setMapperClass(TwoIndexMapper.class);
job.setReducerClass(TwoIndexReducer.class);
job.setJarByClass(TwoIndexDriver.class);
// 3.定义M、R阶段输出数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setoutputKeyClass(Text.class);
job.setoutputValueClass(Text.class);
// 4.指定输入输出路径
FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第七章_MR扩展案例\\output\\part-r-00000"));
FileOutputFormat.setoutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第七章_MR扩展案例\\output1"));
// 5.提交job
job.waitForCompletion(true);
} catch (Exception e){
e.printstacktrace();
}
}
}
Mapper\Reducer阶段做法二:
该方法相对本人的优化点在于,拼接的时候同时使用replace()方法转换形式,少了一次拆分。主要区别在于Mapper阶段的处理,本人选择先处理好
‘ --> ’
样式。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。