如何解决Hadoop mapreduce 2 文件过滤?
我需要打印那些没有订单号的客户的“姓名”。我知道我必须使用映射器方法来实例化变量。我还必须使用 2 个映射器,因为有 2 个输入文件。在reduce阶段,我必须过滤掉没有order.no的客户。但是,如何过滤掉那些没有订单号的客户?
File1.txt
Cust.No. Name
1 Adam
2 Abe
3 Alex
4 Jones
File2.txt
Order.Num. Cust.No. Price
01 1 5422
02 1 23
03 2 1265
04 3 127
我做了什么
最初在 reducer 方法中,我 for 循环键并检查它是否与现有键匹配:
if (!(Data[0].equals("key")))
{
System.out.println(Data[1]);
}
但是,它会打印每一行。
解决方法
看起来像一个常规的reduce side join,所以它可能是一个简单的用例,但是这些类型的计算在工作负载方面往往变得非常残酷。这意味着我们必须找到为我们偷工减料的方法,以确保应用程序能够很好地扩展以适应更大规模的输入。
为应用程序的执行节省时间/空间的最常见方法是尝试以一种我们可以在保留所有功能的同时“削减”一个或多个作业的方式来设计潜在的多个 MR 作业,或者尝试最小化将在输入数据中实现的(自定义)映射器的数量。两者中的后者对于您尝试实现的这种过滤非常常见,因为我们可以轻松地仅使用一个 Map 函数,它的每个实例都会检查当前正在读取的文件的名称以采取相应的行动。>
更具体地说,我们可以在mapper开始运行之前通过Map类的File1.txt
函数获取File2.txt
和setup
文件名,并使用当前文件名被读取以确定如何将文件中的数据切割和存储为键值对。对于你的问题,这个 Map 函数将输出两种类型的键值对:
-
<customer_ID,customer_name>
(用于File1.txt
中的数据) -
<customer_ID,order_ID>
(用于File2.txt
中的数据)
然后,Reduce 函数的实例将为每个客户运行(当然,因为客户 ID 和名称是唯一的)并访问分组值,这些值只不过是多个 Text
对象,这些对象要么持有此客户的姓名或订单 ID。我们只想输出没有任何订单记录的客户,所以我们要做的就是检查这个值列表的长度是否为 1
(也就是如果这个客户没有其他值对用他的名字)。
为了展示这一点,我将两个输入文件放在 HDFS 的目录 /input
中(我对 File1.txt
中的列使用了两个制表符分隔符,对 File2.txt
中的列使用了三个制表符分隔符{1}}。如果您的文件在列之间有不同的制表符或空格,您可以相应地更改它们):
File1.txt
Cust.No Name
1 Adam
2 Abe
3 Alex
4 Jones
File2.txt
Order.Num. Cust.No. Price
01 1 5422
02 1 23
03 2 1265
04 3 127
执行过滤的程序可能如下所示:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class OrderListFilter
{
/* input: <byte_offset,line_of_dataset>
* output: <customer_ID,customer_name> OR <customer_ID,order_ID>
*/
public static class Map extends Mapper<LongWritable,Text,Text>
{
private String current_filename = "";
protected void setup(Context context)
{
// get the name of the current to-be-read file
InputSplit split = context.getInputSplit();
Path path = ((FileSplit) split).getPath();
current_filename = path.getName();
}
public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
{
if(current_filename.equals("File1.txt")) // if mapper is reading through the customer's file
{
if(value.toString().contains("Cust.No")) // remove header
return;
else
{
String[] columns = value.toString().split("\t\t"); // 2 tabs as delimiter
// write customer ID as key and name as value
context.write(new Text(columns[0]),new Text(columns[1]));
}
}
else if(current_filename.equals("File2.txt")) // if mapper is reading through the order's file
{
if(value.toString().contains("Cust.No")) // remove header
return;
else
{
String[] columns = value.toString().split("\t\t\t"); // 3 tabs as delimiter
// write customer ID as key and order num as value
context.write(new Text(columns[1]),new Text(columns[0]));
}
}
}
}
/* input: <customer_ID,order_ID>
* output: <customer_ID,customer_name>
*/
public static class Reduce extends Reducer<Text,Text>
{
public void reduce(Text key,Iterable<Text> values,InterruptedException
{
List<String> customer_records = new ArrayList<String>();
// put all the values in a list to find the size of them
for(Text value : values)
customer_records.add(value.toString());
// if there's only one record,i.e. just the ID and the customer's name in they key-value pairs,// write their ID and name to output
if(customer_records.size() == 1)
context.write(key,new Text(customer_records.get(0)));
}
}
public static void main(String[] args) throws Exception
{
// set the paths of the input and output directories in the HDFS
Path input_dir = new Path("input");
Path output_dir = new Path("output");
// in case the output directory already exists,delete it
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
if(fs.exists(output_dir))
fs.delete(output_dir,true);
// configure the MapReduce job
Job job = Job.getInstance(conf,"Order List Filter");
job.setJarByClass(OrderListFilter.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job,input_dir);
FileOutputFormat.setOutputPath(job,output_dir);
job.waitForCompletion(true);
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。