小编给大家分享一下hadoop中wordcount 、wordmean的示例代码,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!
注意
hadoop src 可以在 hadoop官网上下载。
分析中/**开头的为源码自带,//开头的为作者心得。
hadoop安装环境下的/share/dc/hadoop/api下有自带的api链接,可以方便大家查看,学习。
wordcount:
package hadoop1;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
//split into words by blank just like "//s" in Regular Expression
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
//format :<word,1>, context.write() is the format in map&&reduce to output
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
//foreach value(a kind of word),
//do sum : <word1,1>,......,<word1,1>------><word1,n>, <word2,1>,......,<word2,1>------><word2,n>
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
//format <word1,sum1>,<word2,sum2>......
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//get in/out path, format:otherArgs ={input1,input2......,output}
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
//singleton
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
//combiner is just like a reducer on single node to reduce the net pressure,
//not all the task suit for combiner.
//so <key,1>,......,<key,1>------><key,n>
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
//mapper's output
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//add new path for multinput (input1,input2...)
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
//0 for normal exit,else not normal
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
wordmean:
package hadoop2;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.google.common.base.Charsets;
public class WordMean extends Configured implements Tool {
private double mean = 0;
private final static Text COUNT = new Text("count");
private final static Text LENGTH = new Text("length");
private final static LongWritable ONE = new LongWritable(1);
/**
* Maps words from line of text into 2 key-value pairs; one key-value pair for
* counting the word, another for counting its length.
*/
public static class WordMeanMapper extends
Mapper<Object, Text, Text, LongWritable> {
private LongWritable wordLen = new LongWritable();
/**
* Emits 2 key-value pairs for counting the word and its length. Outputs are
* (Text, LongWritable).
*
* @param value
* This will be a line of text coming in from our input file.
*/
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
String string = itr.nextToken();
//map into format:word1,word2......,wordn.
//for each token(word) split by blank,
//set two kinds of<key,value>------><"count",1>and<"length",string.length>
this.wordLen.set(string.length());
context.write(LENGTH, this.wordLen);
context.write(COUNT, ONE);
}
}
}
/**
* Performs integer summation of all the values for each key.
*/
public static class WordMeanReducer extends
Reducer<Text, LongWritable, Text, LongWritable> {
// LongWritable is just like Long in java,
//to implement hadoop's own type is for Serialization and Anti serialization
private LongWritable sum = new LongWritable();
/**
* Sums all the individual values within the iterator and writes them to the
* same key.
*
* @param key
* This will be one of 2 constants: LENGTH_STR or COUNT_STR.
* @param values
* This will be an iterator of all the values associated with that
* key.
*/
public void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
//for two constants:"count" and "length",
//calculate the sum for each constant. :<count,1>,.....,<count,1> -------> <count,n>&&<length,3>,......,<length,4>-----><length,m>
int theSum = 0;
for (LongWritable val : values) {
theSum += val.get();
}
sum.set(theSum);
context.write(key, sum);
}
}
/**
* Reads the output file and parses the summation of lengths, and the word
* count, to perform a quick calculation of the mean.
*
* @param path
* The path to find the output file in. Set in main to the output
* directory.
* @throws IOException
* If it cannot access the output directory, we throw an exception.
*/
private double readAndCalcMean(Path path, Configuration conf)
throws IOException {
//read from reducers' output
FileSystem fs = FileSystem.get(conf);
Path file = new Path(path, "part-r-00000");
if (!fs.exists(file))
throw new IOException("Output not found!");
BufferedReader br = null;
// average = total sum(m in reduce) / number of elements(n in reduce);
try {
//BufferedReader is a Decorator to InputStreamReader,
/for add the method .readLine() and so on.
br = new BufferedReader(new InputStreamReader(fs.open(file), Charsets.UTF_8));
long count = 0;
long length = 0;
String line;
while ((line = br.readLine()) != null) {
StringTokenizer st = new StringTokenizer(line);
// grab type ------to spilt "count" and "length"
String type = st.nextToken();
// differentiate
if (type.equals(COUNT.toString())) {
String countLit = st.nextToken();
count = Long.parseLong(countLit);
System.out.println("The count is: " + count );//~ add by author :output total word count n
} else if (type.equals(LENGTH.toString())) {
String lengthLit = st.nextToken();
length = Long.parseLong(lengthLit);
System.out.println("The length is: " + length );//~ add by author :output total word length m
}
}
double theMean = (((double) length) / ((double) count));
System.out.println("The mean is: " + theMean);
return theMean;
} finally {
if (br != null) {
br.close();
}
}
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new WordMean(), args);
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: wordmean <in> <out>");
return 0;
}
Configuration conf = getConf();
Job job = Job.getInstance(conf, "word mean");
job.setJarByClass(WordMean.class);
job.setMapperClass(WordMeanMapper.class);
job.setCombinerClass(WordMeanReducer.class);
job.setReducerClass(WordMeanReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputpath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputpath);
boolean result = job.waitForCompletion(true);
mean = readAndCalcMean(outputpath, conf);
return (result ? 0 : 1);
}
/**
* Only valuable after run() called.
*
* @return Returns the mean value.
*/
public double getMean() {
return mean;
}
}
看完了这篇文章,相信你对“hadoop中wordcount 、wordmean的示例代码”有了一定的了解,如果想了解更多相关知识,欢迎关注天达云行业资讯频道,感谢各位的阅读!