这篇文章主要介绍了hadoop中mapreduce如何自定义InputFormat,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。
首先我们要先定义一个类继承FileInputFormat,并重写createRecordReader方法返回RecordReader,然后定义一个类继承RecordReader,createRecordReader方法返回也就是我们定义的RecordReader的子类的对象。
代码如下
public class TrackInputFormat extends FileInputFormat<LongWritable, Text> {
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit arg0,
TaskAttemptContext arg1) throws IOException, InterruptedException {
// TODO Auto-generated method stub
return new TrackRecordReader();
}
}
package input;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.log4j.Logger;
/**
* Treats keys as offset in file and value as line.
*
* @deprecated Use
* {@link org.apache.hadoop.mapreduce.lib.input.LineRecordReader}
* instead.
*/
public class TrackRecordReader extends RecordReader<LongWritable, Text> {
Logger logger = Logger.getLogger(TrackRecordReader.class.getName());
private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private NewLineReader in;
private int maxLineLength;
private LongWritable key = null;
private Text value = null;
// ----------------------
// 行分隔符,即一条记录的分隔符
private byte[] separator = "]@\n".getBytes();
// --------------------
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
//mapreduce.input.linerecordreader.line.maxlength
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
//logger.info("path========================="+file.toString());
compressionCodecs = new CompressionCodecFactory(job);
final CompressionCodec codec = compressionCodecs.getCodec(file);
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
boolean skipFirstLine = false;
//logger.info("codec========================="+codec);
if (codec != null) {
in = new NewLineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
this.start -= separator.length;//
// --start;
fileIn.seek(start);
}
in = new NewLineReader(fileIn, job);
}
if (skipFirstLine) { // skip first line and re-establish "start".
start += in.readLine(new Text(), 0,
(int) Math.min((long) Integer.MAX_VALUE, end - start));
}
this.pos = start;
/*if (skipFirstLine) {
int newSize = in.readLine(new Text(), 0, (int) Math.min( (long) Integer.MAX_VALUE, end - start));
if(newSize > 0){
start += newSize;
}
}*/
}
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
int newSize = 0;
while (pos < end) {
newSize = in.readLine(value, maxLineLength,
Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
maxLineLength));
if (newSize == 0) {
break;
}
pos += newSize;
if (newSize < maxLineLength) {
break;
}
}
if (newSize == 0) {
//读取下一个buffer
key = null;
value = null;
return false;
} else {
//读同一个buffer的下一个记录
return true;
}
}
@Override
public LongWritable getCurrentKey() {
return key;
}
@Override
public Text getCurrentValue() {
return value;
}
/**
* Get the progress within the split
*/
public float getProgress() {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f, (pos - start) / (float) (end - start));
}
}
public synchronized void close() throws IOException {
if (in != null) {
in.close();
}
}
public class NewLineReader {
private static final int DEFAULT_BUFFER_SIZE = 256 * 1024* 1024;
private int bufferSize = DEFAULT_BUFFER_SIZE;
private InputStream in;
private byte[] buffer;
private int bufferLength = 0;
private int bufferPosn = 0;
public NewLineReader(InputStream in) {
this(in, DEFAULT_BUFFER_SIZE);
}
public NewLineReader(InputStream in, int bufferSize) {
this.in = in;
this.bufferSize = bufferSize;
this.buffer = new byte[this.bufferSize];
}
public NewLineReader(InputStream in, Configuration conf)
throws IOException {
this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
}
public void close() throws IOException {
in.close();
}
public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
throws IOException {
str.clear();
Text record = new Text();
int txtLength = 0;
long bytesConsumed = 0L;
boolean newline = false;
int sepPosn = 0;
do {
// 已经读到buffer的末尾了,读下一个buffer
if (this.bufferPosn >= this.bufferLength) {
bufferPosn = 0;
bufferLength = in.read(buffer);
// 读到文件末尾了,则跳出,进行下一个文件的读取
if (bufferLength <= 0) {
break;
}
}
int startPosn = this.bufferPosn;
for (; bufferPosn < bufferLength; bufferPosn++) {
// 处理上一个buffer的尾巴被切成了两半的分隔符(如果分隔符中重复字符过多在这里会有问题)
if (sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]) {
sepPosn = 0;
}
// 遇到行分隔符的第一个字符
if (buffer[bufferPosn] == separator[sepPosn]) {
bufferPosn++;
int i = 0;
// 判断接下来的字符是否也是行分隔符中的字符
for (++sepPosn; sepPosn < separator.length; i++, sepPosn++) {
// buffer的最后刚好是分隔符,且分隔符被不幸地切成了两半
if (bufferPosn + i >= bufferLength) {
bufferPosn += i - 1;
break;
}
// 一旦其中有一个字符不相同,就判定为不是分隔符
if (this.buffer[this.bufferPosn + i] != separator[sepPosn]) {
sepPosn = 0;
break;
}
}
// 的确遇到了行分隔符
if (sepPosn == separator.length) {
bufferPosn += i;
newline = true;
sepPosn = 0;
break;
}
}
}
int readLength = this.bufferPosn - startPosn;
bytesConsumed += readLength;
// 行分隔符不放入块中
if (readLength > maxLineLength - txtLength) {
readLength = maxLineLength - txtLength;
}
if (readLength > 0) {
record.append(this.buffer, startPosn, readLength);
txtLength += readLength;
// 去掉记录的分隔符
if (newline) {
str.set(record.getBytes(), 0, record.getLength() - separator.length);
}
}
}
while (!newline && (bytesConsumed < maxBytesToConsume));
if (bytesConsumed > (long) Integer.MAX_VALUE) {
throw new IOException("Too many bytes before newline: "
+ bytesConsumed);
}
return (int) bytesConsumed;
}
public int readLine(Text str, int maxLineLength) throws IOException {
return readLine(str, maxLineLength, Integer.MAX_VALUE);
}
public int readLine(Text str) throws IOException {
return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
}
}
}
private byte[] separator = "]@\n".getBytes();
感谢你能够认真阅读完这篇文章,希望小编分享的“hadoop中mapreduce如何自定义InputFormat”这篇文章对大家有帮助,同时也希望大家多多支持天达云,关注天达云行业资讯频道,更多相关知识等着你来学习!