本篇内容主要讲解“MapReduce怎么使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“MapReduce怎么使用”吧!
什么是MR
MR是一种分布计算模型,主要用来解决海量数据的计算问题的。它包含了两种计算函数,一个是Mapping,另外一个是Reducing。Mapping对集合内的每个目标做同一个操作,Reduceing则是遍历集合中的元素返回一个综合的结果。我们操作代码时,只需要重写map和reduce方法就行,十分简单。这两个函数的形参都是k,v对,当数据量到达10PB以上时,则会速度变慢。
MR执行过程
MR程序启动时,会把输入文件转化成<k1,v1>键值对传给map函数,有几个键值对就执行几次map函数,但不是说有几个键值对就有几个Mapper进程,这是不对的。经过map函数处理,变成<k2,v2>键值对。由<k2,v2>转变成reduce函数的输入<k2,{v2,.....}>的过程被称之为shuffle。shuffle并不是象map和reduce这样的某个函数,不是需要单独拿出节点运行的,它仅仅只是一个过程。<k2,{v2...}>进过reduce函数处理,变成了最后的输出<k3,v3>。在到达reduce函数之前,键值对的数目是不变的。
Map阶段
(1).根据输入文件解析成<k1,v1>对,每一对调用一次map函数
(2).根据自己编写的map函数,将键值对处理,变成新的<k2,v2>键值对输出
(3).对输出的键值对进行分区,不同分区对应着不同的Reducer进程
(4).每个分区中的键值对,根据key进行排序,分组。然后把相同key的val放到同一个集合中。
(5).进行规约(可选)
Reduce阶段
(1).多个map函数输出的kv对,按照不同分区,传输到不同的reduce节点上。
(2).将多个map函数输出的kv对合并,排序。根据reduce函数逻辑,处理<k2,{v2..}>,转换成新的键值对输出
(3).输出保存文件
3.简单例子
Wordcount
public class WordCount {
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
Text k2=new Text();
LongWritable v2=new LongWritable();
@Override
protected void map(LongWritable k1, Text v1,Context context)
throws IOException, InterruptedException {
String[] words=v1.toString().split("\t");
for (String string : words) {
k2.set(string);
v2.set(1L);
context.write(k2, v2);
}
}
}
public static class MyReduce extends Reducer<Text, LongWritable, Text, LongWritable>{
LongWritable v3=new LongWritable();
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s,Context context) throws IOException, InterruptedException {
long sum=0;
for (LongWritable longWritable : v2s) {
sum=sum+longWritable.get();
}
v3.set(sum);
context.write(k2, v3);
}
}
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf, WordCount.class.getSimpleName());
job.setJarByClass(WordCount.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://115.28.138.100:9000/a.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://115.28.138.100:9000/out4"));
job.waitForCompletion(true);
}
}
4.MR的序列化
序列化就是把结构化的对象转换为字节流,在MR中,他没有用java自己的序列化,而是自己实现了一套序列化。因为相比较而言,hadoop的序列化有着诸多优点。在mr程序中,我们的参数和输出的键值对全都是实现了序列化的对象,当我们需要自订一个序列化对象,该如何操作呢?只需要实现Writable接口即可,当然key需要实现WritableComparable接口,因为需要根据key来排序和分组。
接着有个小例子来展示序列化。就是电信流量的处理例子。
public class LiuLiang {
public static class MyMapper extends Mapper<LongWritable, Text, Text, MyArrayWritable>{
Text k2=new Text();
MyArrayWritable v2=new MyArrayWritable();
LongWritable v21=new LongWritable();
LongWritable v22=new LongWritable();
LongWritable v23=new LongWritable();
LongWritable v24=new LongWritable();
LongWritable[] values=new LongWritable[4];
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
String[] words=v1.toString().split("\t");
k2.set(words[1]);
v21.set(Long.parseLong(words[6]));
v22.set(Long.parseLong(words[7]));
v23.set(Long.parseLong(words[8]));
v24.set(Long.parseLong(words[9]));
values[0]=v21;
values[1]=v22;
values[2]=v23;
values[3]=v24;
v2.set(values);
context.write(k2, v2);
}
}
public static class MyReduce extends Reducer<Text, MyArrayWritable, Text, Text>{
Text v3=new Text();
@Override
protected void reduce(Text k2, Iterable<MyArrayWritable> v2s, Context context)
throws IOException, InterruptedException {
long sum1=0;
long sum2=0;
long sum3=0;
long sum4=0;
for (MyArrayWritable myArrayWritable : v2s) {
Writable[] values= myArrayWritable.get();
sum1=sum1+((LongWritable)values[0]).get();
sum2=sum2+((LongWritable)values[1]).get();
sum3=sum3+((LongWritable)values[2]).get();
sum4=sum4+((LongWritable)values[3]).get();
}
v3.set("\t"+sum1+"\t"+sum2+"\t"+sum3+"\t"+sum4);
context.write(k2, v3);
}
}
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf, LiuLiang.class.getSimpleName());
job.setJarByClass(LiuLiang.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MyArrayWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://115.28.138.100:9000/HTTP_20130313143750.dat"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://115.28.138.100:9000/ceshi3"));
job.waitForCompletion(true);
}
}
class MyArrayWritable extends ArrayWritable{
public MyArrayWritable(){
super(LongWritable.class);
}
public MyArrayWritable(String[] arg0) {
super(arg0);
}
}
5.SequenceFile
在HDFS的学习中,提到了小文件的解决方案,其中一个便是这个SequenceFile。他是一种无序存储,将kv对序列化到文件中,从而合并许多小文件并且支持压缩。缺点是必须遍历才能查看里面各个小文件。
public class SequenceFileTest {
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(new URI("hdfs://115.28.138.100:9000"), conf, "hadoop");
//Write(conf, fileSystem);
Read(conf, fileSystem);
}
private static void Read(Configuration conf, FileSystem fileSystem) throws IOException {
Reader reader=new SequenceFile.Reader(fileSystem, new Path("/sqtest"), conf);
Text key=new Text();
Text val=new Text();
while(reader.next(key, val)){
System.out.println(key.toString()+"----"+val.toString());
}
IOUtils.closeStream(reader);
}
private static void Write(Configuration conf, FileSystem fileSystem) throws IOException {
Writer writer = SequenceFile.createWriter(fileSystem, conf, new Path("/sqtest"), Text.class, Text.class);
Collection<File> files = FileUtils.listFiles(new File("F:\\ceshi1"), new String[] { "txt" }, false);
for (File file : files) {
Text text = new Text();
text.set(FileUtils.readFileToString(file));
writer.append(new Text(file.getName()), text);
}
IOUtils.closeStream(writer);
}
}
到此,相信大家对“MapReduce怎么使用”有了更深的了解,不妨来实际操作一番吧!这里是天达云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!