微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

hadoop如何自定义GroupComparator实现求最大值

小编给大家分享一下hadoop如何自定义GroupComparator实现求最大值,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/*
 * 如下蚀测试数据,第一个字段为订单号,第二个字段为id 第三个字段为价格,数据以tab键分开
 * 要求求出订单号相同的价格最大的条记录
O00001	123	1234
O00002	124	3435
O00003	125	132.78
O00004	126	334
O00004	127	8976
O00003	128	635
O00002	129	23
O00001	130	980
O00001	131	111
O00002	132	66
O00003	133	42
O00004	134	88
O00005	135	900
结果如下:
O00001	123	1234.0
O00002	124	3435.0
O00003	128	635.0
O00004	127	8976.0
O00005	135	900.0
*
*/
public class GroupComparatorMian {
	static final Log LOG = LogFactory.getLog(GroupComparatorMian.class);
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(GroupComparatorMian.class);
		job.setMapperClass(GroupComparatorMapper.class);
		job.setReducerClass(GroupComparatorReducer.class);
		job.setoutputKeyClass(OrderBean.class);
		job.setoutputValueClass(NullWritable.class);
		job.setGroupingComparatorClass(CustGroupComparator.class);
		String jobName = "'Customize groupcomparator test'";
		job.setJobName(jobName);
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setoutputPath(job, new Path(args[1]));
		boolean bb = job.waitForCompletion(true);
		if(bb) {
			LOG.info("Job "+ jobName +" is done.");
			
		}else {
			LOG.info("Job "+ jobName +"is going wrong,Now exit.");
			System.exit(0);
		}
	}
}
class CustGroupComparator extends WritableComparator{
	public CustGroupComparator() {
		super(OrderBean.class,true);
	}
	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		OrderBean oa = (OrderBean)a;
		OrderBean ob = (OrderBean)b;
		return oa.getorder_id().compareto(ob.getorder_id());
	}
	
}
class OrderBean implements WritableComparable<OrderBean>{
	private String order_id;
	private String id ;
	private double prise;
	public OrderBean() {
		
	}
	
	public OrderBean(String order_id,String id,double prise) {
		this.order_id = order_id ;
		this.id = id;
		this.prise = prise;
	}
	public String getorder_id() {
		return order_id;
	}
	public void setorder_id(String order_id) {
		this.order_id = order_id;
	}
	public String getId() {
		return id;
	}
	public void setId(String id) {
		this.id = id;
	}
	public double getPrise() {
		return prise;
	}
	public void setPrise(double prise) {
		this.prise = prise;
	}
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(order_id);
		out.writeUTF(id);
		out.writeDouble(prise);
	}
	@Override
	public void readFields(DataInput in) throws IOException {
		this.order_id = in.readUTF();
		this.id = in.readUTF();
		this.prise = in.readDouble();
	}
	@Override
	public int compareto(OrderBean o) {
		int cnt = this.order_id.compareto(o.getorder_id());
		if(cnt==0) {
			cnt = (int) (-(this.prise- o.getPrise()));
		}
		return cnt;
		
	}
	@Override
	public String toString() {
		return  order_id + "\t" + id + "\t" + prise ;
	}
	
	
}
class GroupComparatorMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
	NullWritable nul =  NullWritable.get();
	OrderBean ob = new OrderBean();
	@Override
	protected void map(LongWritable key, Text value,
			Mapper<LongWritable, Text, OrderBean, NullWritable>.Context context)
			throws IOException, InterruptedException {
		String[] split =  value.toString().split("\t");
		ob.setorder_id(split[0]);
		ob.setId(split[1]);
		ob.setPrise(Double.parseDouble(split[2]));
		context.write(ob, nul);
	}
}
class GroupComparatorReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{
	NullWritable nul = NullWritable.get();
	@Override
	protected void reduce(OrderBean bean, Iterable<NullWritable> iter,
			Reducer<OrderBean, NullWritable, OrderBean, NullWritable>.Context context)
			throws IOException, InterruptedException {
		context.write(bean, nul);
	}
	
	
}

以上是“hadoop如何自定义GroupComparator实现求最大值”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享内容对大家有所帮助,如果还想学习更多知识,欢迎关注编程之家行业资讯频道!

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐