这篇文章主要介绍了MapReduce主要接口有哪些,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。
(1) InputFormat接口
用户需要实现该接口以指定输入文件的内容格式。该接口有两个方法
public interface InputFormat<K, V> {
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader<K, V> getRecordReader(InputSplit split,JobConf job,Reporter reporter) throws IOException;
}
其中getSplits函数将所有输入数据分成numSplits个split,每个split交给一个map task处理。getRecordReader函数提供一个用户解析split的迭代器对象,它将split中的每个record解析成key/value对。
Hadoop本身提供了一些InputFormat:
TextInputFormat
作为默认的文件输入格式,用于读取纯文本文件,文件被分为一系列以LF或者CR结束的行,key是每一行的位置偏移量,是LongWritable类型的,value是每一行的内容,为Text类型。
KeyValueTextInputFormat
同样用于读取文件,如果行被分隔符(缺省是tab)分割为两部分,第一部分为key,剩下的部分为value;如果没有分隔符,整行作为 key,value为空。
SequenceFileInputFormat
用于读取sequence file。 sequence file是Hadoop用于存储数据自定义格式的binary文件。它有两个子类:SequenceFileAsBinaryInputFormat,将 key和value以BytesWritable的类型读出;SequenceFileAsTextInputFormat,将key和value以Text类型读出。
SequenceFileInputFilter
根据filter从sequence文件中取得部分满足条件的数据,通过 setFilterClass指定Filter,内置了三种 Filter,RegexFilter取key值满足指定的正则表达式的记录;PercentFilter通过指定参数f,取记录行数%f==0的记录;MD5Filter通过指定参数f,取MD5(key)%f==0的记录。
NLineInputFormat
可以将文件以行为单位进行split,比如文件的每一行对应一个map。得到的key是每一行的位置偏移量(LongWritable类型),value是每一行的内容,Text类型。
MultipleInputs
用于多个数据源的join
(2)Mapper接口
用户需继承Mapper接口实现自己的Mapper,Mapper中必须实现的函数是
Mapper有setup(),map(),cleanup()和run()四个方法。其中setup()一般是用来进行一些map()前的准备工作,map()则一般承担主要的处理工作,cleanup()则是收尾工作如关闭文件或者执行map()后的K-V分发等。run()方法提供了setup->map->cleanup()的执行模板。
(3)Partitioner接口
用户需继承该接口实现自己的Partitioner以指定map task产生的key/value对交给哪个reduce task处理,好的Partitioner能让每个reduce task处理的数据相近,从而达到负载均衡。Partitioner中需实现的函数是
getPartition( K2 key, V2 value, int numPartitions)
该函数返回<K2 V2>对应的reduce task ID。
用户如果不提供Partitioner,Hadoop会使用默认的(实际上是个hash函数)。
Partitioner如何使用
•实现Partitioner接口覆盖getPartition()方法
•Partitioner示例
public static class MyPartitioner extends Partitioner<Text, Text> {
@Override
public int getPartition(Text key, Text value, int numPartitions) {
}
}
Partitioner需求示例
•需求描述
•数据文件中含有省份
•需要相同的省份送到相同的Reduce里
•从而产生不同的文件
•步骤
•实现Partitioner,覆盖getPartition
•根据省份字段进行切分
(4)Combiner
combine函数把一个map函数产生的<key,value>对(多个key, value)合并成一个新的<key2,value2>. 将新的<key2,value2>作为输入到reduce函数中,其格式与reduce函数相同。Combiner使得map task与reduce task之间的数据传输量大大减小,可明显提高性能。大多数情况下,Combiner与Reducer相同。
什么情况下可以使用Combiner
•可以对记录进行汇总统计的场景,如求和。
•求平均数的场景就不可以使用了
Combiner执行时机
•运行combiner函数的时机有可能会是merge完成之前,或者之后,这个时机可以由一个参数控制,即 min.num.spill.for.combine(default 3)
•当job中设定了combiner,并且spill数最少有3个的时候,那么combiner函数就会在merge产生结果文件之前运行
•通过这样的方式,就可以在spill非常多需要merge,并且很多数据需要做conbine的时候,减少写入到磁盘文件的数据数量,同样是为了减少对磁盘的读写频率,有可能达到优化作业的目的。
•Combiner也有可能不执行, Combiner会考虑当时集群的负载情况。
Combiner如何使用
•继承Reducer类
public static class Combiner extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
}
}
(5)Reducer接口
实现自己的Reducer,必须实现reduce函数
(6)OutputFormat
用户通过OutputFormat指定输出文件的内容格式,不过它没有split。每个reduce task将其数据写入自己的文件,文件名为part-nnnnn,其中nnnnn为reduce task的ID。
public abstract class OutputFormat<K, V> {
/**
* 创建一个记录写入器
*/
public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException;
/**
* 检查结果输出的存储空间是否有效
*/
public abstract void checkOutputSpecs(JobContext context) throws IOException, InterruptedException;
/**
* 创建一个任务提交器
*/
public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException;
}
TextOutputFormat,输出到纯文本文件,格式为 key + " " + value。
NullOutputFormat,hadoop中的/dev/null,将输出送进黑洞。
SequenceFileOutputFormat, 输出到sequence file格式文件。
MultipleSequenceFileOutputFormat, MultipleTextOutputFormat,根据key将记录输出到不同的文件。
DBInputFormat和DBOutputFormat,从DB读取,输出到DB。
感谢你能够认真阅读完这篇文章,希望小编分享的“MapReduce主要接口有哪些”这篇文章对大家有帮助,同时也希望大家多多支持天达云,关注天达云行业资讯频道,更多相关知识等着你来学习!