这篇文章主要讲解了“Hadoop中的MultipleOutput实例使用”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Hadoop中的MultipleOutput实例使用”吧!
原数据:
预想处理后的结果:
MyMapper.java
package com.xr.text;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String[] split = value.toString().split(";");
context.write(new Text(split[0]), new Text(split[1]));
}
}
MyReducer.java
package com.xr.text;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class MyReducer extends Reducer<Text, Text, Text, Text> {
private MultipleOutputs mos;
/**
* start before set MultipleOutputs;
*/
protected void setup(Context context) throws IOException,
InterruptedException {
mos = new MultipleOutputs(context);
}
protected void reduce(Text k1, Iterable<Text> value,Context context)
throws IOException, InterruptedException {
String key = k1.toString();
for(Text t : value){
if("中国".equals(key)){
mos.write("china",new Text("中国"), t);
}else if("美国".equals(key)){
mos.write("usa",new Text("美国"),t);
}else if("中国人".equals(key)){
mos.write("cpeople",new Text("中国人"),t);
}
}
}
/**
* close MultipleOutputs;
*/
protected void cleanup(Context context)
throws IOException, InterruptedException {
mos.close();
}
}
JobTest.java
package com.xr.text;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class JobTest {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
String inputPath = "hdfs://192.168.75.100:9000/1.txt";
String outputPath = "hdfs://192.168.75.100:9000/ceshi";
Job job = new Job();
job.setJarByClass(JobTest.class);
job.setMapperClass(MyMapper.class);
/**
* set MultipleOutput file name
*/
MultipleOutputs.addNamedOutput(job, "china", TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, "usa", TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, "cpeople", TextOutputFormat.class, Text.class, Text.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(inputPath));
// Configuration conf = new Configuration();
// FileSystem fs = FileSystem.get(conf);
//
// if(fs.exists(new Path(outputPath))){
// fs.delete(new Path(outputPath), true);
// }
FileOutputFormat.setOutputPath(job, new Path(outputPath));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
运行过程中报错:
14/08/12 12:44:02 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/08/12 12:44:02 ERROR security.UserGroupInformation: PriviledgedActionException as:Xr cause:java.io.IOException: Failed to set permissions of path: \tmp\hadoop-Xr\mapred\staging\Xr-1514460710\.staging to 0700
Exception in thread "main">java.io.IOException: Failed to set permissions of path: \tmp\hadoop-Xr\mapred\staging\Xr-1514460710\.staging to 0700
at org.apache.hadoop.fs.FileUtil.checkReturnValue(FileUtil.java:689)
at org.apache.hadoop.fs.FileUtil.setPermission(FileUtil.java:662)
at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:509)
at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:344)
at org.apache.hadoop.fs.FilterFileSystem.mkdirs(FilterFileSystem.java:189)
at org.apache.hadoop.mapreduce.JobSubmissionFiles.getStagingDir(JobSubmissionFiles.java:116)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:918)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:912)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1149)
at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:912)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:500)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:530)
at com.xr.text.JobTest.main(JobTest.java:37)
错误解决方案:
1. 把hadoop-core-1.1.2.jar中的FileUtil.class删除.
2. 再把/org/apache/hadoop/fs/FileUtil.java从源码中copy出来
3. 注释checkReturnValue()方法
运行时再次报错:
java.lang.OutOfMemoryError: Java heap space
解决方案:
ok,job顺利执行。
生成以下文件:
感谢各位的阅读,以上就是“Hadoop中的MultipleOutput实例使用”的内容了,经过本文的学习后,相信大家对Hadoop中的MultipleOutput实例使用这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是天达云,小编将为大家推送更多相关知识点的文章,欢迎关注!