本篇内容主要讲解“hive怎么指定多个字符作为列分隔符”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“hive怎么指定多个字符作为列分隔符”吧!
hive创建表指定分隔符,不支持多个字符作为分隔符,如果想使用多个字符作为分割符的话就需要实现InputFormat.主要重写next方法,代码如下
package hiveStream; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConfigurable; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; public class MyHiveInputFormat extends TextInputFormat implements JobConfigurable { public RecordReader<LongWritable, Text> getRecordReader( InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(genericSplit.toString()); return new MyRecordReader((FileSplit) genericSplit, job); } }
package hiveStream; import java.io.IOException; import java.io.InputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.util.LineReader; public class MyRecordReader implements RecordReader<LongWritable, Text> { private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private LineReader lineReader; int maxLineLength; // 构造方法 public MyRecordReader(FileSplit inputSplit, Configuration job) throws IOException { maxLineLength = job.getInt("mapred.mutilCharRecordReader.maxlength", Integer.MAX_VALUE); start = inputSplit.getStart(); end = start + inputSplit.getLength(); final Path file = inputSplit.getPath(); // 创建压缩器 compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); // 打开文件系统 FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(file); boolean skipFirstLine = false; if (codec != null) { lineReader = new LineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { if (start != 0) { skipFirstLine = true; --start; fileIn.seek(start); } lineReader = new LineReader(fileIn, job); } if (skipFirstLine) { start += lineReader.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - start)); } this.pos = start; } public MyRecordReader(InputStream in, long offset, long endOffset, int maxLineLength) { this.maxLineLength = maxLineLength; this.start = offset; this.lineReader = new LineReader(in); this.pos = offset; this.end = endOffset; } public MyRecordReader(InputStream in, long offset, long endOffset, Configuration job) throws IOException { this.maxLineLength = job.getInt( "mapred.mutilCharRecordReader.maxlength", Integer.MAX_VALUE); this.lineReader = new LineReader(in, job); this.start = offset; this.end = endOffset; } @Override public void close() throws IOException { if (lineReader != null) lineReader.close(); } @Override public LongWritable createKey() { return new LongWritable(); } @Override public Text createValue() { return new Text(); } @Override public long getPos() throws IOException { return pos; } @Override public float getProgress() throws IOException { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (pos - start) / (float) (end - start)); } } @Override public boolean next(LongWritable key, Text value) throws IOException { while (pos < end) { key.set(pos); int newSize = lineReader.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); // 把字符串中的"##"转变为"#" String strReplace = value.toString().replace("##", "#"); Text txtReplace = new Text(); txtReplace.set(strReplace); value.set(txtReplace.getBytes(), 0, txtReplace.getLength()); if (newSize == 0) return false; pos += newSize; if (newSize < maxLineLength) return true; } return false; } }
建表语句:自定义 outputformat/inputformat 后,在建表时需要指定 outputformat/inputformat
create external table testHiveInput( id int,name string,age int) row format delimited fields terminated by '|' stored as INPUTFORMAT 'hiveStream.MyHiveInputFormat' OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.HiveIgnoreKeytextoutputFormat' location '/user/hdfs/hiveInput';
测试数据:
1##Tom##22
2##Jerry##22
3##Jeny##22
public static void testHive() throws Exception { String sql = "select id,name,age from testHiveInput"; Class.forName("org.apache.hive.jdbc.HiveDriver"); String url = "jdbc:hive2://xxx.xxx.xxx.xxx:10000"; Connection conn = DriverManager.getConnection(url, "hive", "passwd"); Statement stmt = conn.createStatement(); stmt.execute("add jar /usr/lib/hive/lib/hiveInput.jar"); ResultSet rs = stmt.executeQuery(sql); while (rs.next()) { System.out.println(rs.getString("id")+" "+ rs.getString("name") + " " + rs.getString("age")); } }
到此,相信大家对“hive怎么指定多个字符作为列分隔符”有了更深的了解,不妨来实际操作一番吧!这里是编程之家网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。