如何解决HBase 批量加载错误 [java.io.IOException: 初始化所有收集器失败]
[问题摘要]:我正在尝试将 CSV 文件批量加载到 HBase。我在 Mapper 段中遇到错误。我关注了这个tutorial。
执行批量加载的标准程序如下。首先,将数据加载到 HDFS 目录。这一步是直接的,没有错误。第二步是生成HFiles。按照上面提到的教程,我编写了一个名为 GenerateHFilesFromImages
的类。在这第二步中,我在 Mapper 段中遇到了错误。提供了错误描述和示例数据集以及 GenerateHFilesFromImages
的代码段。 Mapper 段中出现此错误的潜在原因是什么?
错误信息如下:
all the collectors failed. Error in last collector was
tor was:java.lang.ClassCastException: class GenerateHFilesFromImages$GenerateHFilesMapper
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:423)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:710)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:782)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:347)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:187
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)
Caused by: java.lang.ClassCastException: class GenerateHFilesFromImages$GenerateHFilesMapper
at java.lang.Class.asSubclass(Class.java:3404)
at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:885)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:1018)
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:408)
... 9 more
完整的Java类如下。
import org.apache.commons.cli.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import java.io.IOException;
/**
* @author chamin 30/07/2021
*/
public class GenerateHFilesFromImages extends Configured implements Tool {
public static final String NAME = "GenerateHFilesFromImages";
static Logger LOGGER = Logger.getLogger(GenerateHFilesFromImages.class);
public static void main(String[] args) throws Exception{
int err = ToolRunner.run(new Configuration(),new GenerateHFilesFromImages(),args);
System.exit(err);
}
@Override
public int run(String[] args) throws Exception{
Configuration conf = getConf();
String[] otherArgs
= new GenericOptionsParser(conf,args).getRemainingArgs();
CommandLine cmd = parseArgs(otherArgs);
String inputFile = cmd.getOptionValue("i");
String outputFile = cmd.getOptionValue("o");
//conf.setInt("model",Integer.parseInt(cmd.getOptionValue("m")));
String tableName = cmd.getOptionValue("table_name");
conf.set("tableName",tableName);
String[] connectionParams = null;
if (cmd.hasOption("conn")) {
connectionParams = cmd.getOptionValues("conn");
}
if (connectionParams != null) {
conf.set(Constants.HBASE_CONFIGURATION_ZOOKEEPER_QUORUM,connectionParams[0]);
LOGGER.debug(String.format("Set quorum string %s",conf.get(Constants.HBASE_CONFIGURATION_ZOOKEEPER_QUORUM)));
conf.setInt(Constants.HBASE_CONFIGURATION_ZOOKEEPER_CLIENT_PORT,Integer.parseInt(connectionParams[1]));
LOGGER.debug(String.format("Set port %d",conf.getInt(Constants.HBASE_CONFIGURATION_ZOOKEEPER_CLIENT_PORT,0)));
}
Job job = Job.getInstance(conf,NAME + "\n");
job.setJarByClass(GenerateHFilesFromImages.class);
job.setMapOutputKeyClass(GenerateHFilesMapper.class);
FileInputFormat.addInputPath(job,new Path(inputFile));// checked
FileOutputFormat.setOutputPath(job,new Path(outputFile));// checked
job.setInputFormatClass(TextInputFormat.class);
// hand the output configurations over to HFileOutputFormat2
Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf(tableName));
RegionLocator region = connection.getRegionLocator(TableName.valueOf(tableName));
HFileOutputFormat2.configureIncrementalLoad(job,table,region);
return job.waitForCompletion(true) ? 0 : 1;
}
private static CommandLine parseArgs(String[] args) {
Options options = new Options();
Option o;
// input path
o = new Option("i","input",true,"input directory");
o.setRequired(true);
options.addOption(o);
// output path
o = new Option("o","output","output directory");
o.setRequired(true);
options.addOption(o);
// table name
options.addOption("table_name","table name");
// model number
//options.addOption("m","model","data model number");
// connection parameters
o = new Option("conn","connection","Zookepper quorum and port (optional)");
o.setArgs(2);
o.setRequired(false);
options.addOption(o);
// debug flag
options.addOption("d","debug",false,"switch on DEBUG log level");
CommandLineParser parser = new PosixParser();
CommandLine cmd = null;
try {
cmd = parser.parse(options,args);
} catch (Exception e) {
System.err.println("ERROR: " + e.getMessage() + "\n");
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp(NAME + " ",options,true);
System.exit(-1);
}
if (cmd.hasOption("d")) {
LOGGER.setLevel(Level.DEBUG);
System.out.println("DEBUG ON");
}
return cmd;
}
public static class GenerateHFilesMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put> {
@Override
public void setup(Context context) {
Configuration conf = context.getConfiguration();
// tableName = "cn1374_testCSV1";
}
@Override
public void map(LongWritable key,Text value,Context context)
throws IOException,InterruptedException {
String[] values = value.toString().split(",");
String rowKey = values[29];
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("first_name"),Bytes.toBytes(values[1]));
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("a"),Bytes.toBytes(values[2]));
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("b"),Bytes.toBytes(values[3]));
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("c"),Bytes.toBytes(values[4]));
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("d"),Bytes.toBytes(values[5]));
context.write(new ImmutableBytesWritable(Bytes.toBytes(rowKey)),put);
}
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。