这篇文章主要为大家展示了“Hadoop辅助排序的示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“Hadoop辅助排序的示例分析”这篇文章吧。
1. 需求
求每年的最高温度
2. 样例数据
1995 10
1996 11
1995 16
1995 22
1996 26
1995 3
1996 7
1996 10
1996 20
1996 33
1995 21
1996 9
1995 31
1995 -13
1995 22
1997 -2
1997 28
1997 15
1995 8
3. 思路、代码
将记录按年份分组并按温度降序排序,然后才将同一年份的所有记录送到一个 reducer 组,则各组的首条记录就是这一年的最高温度。实现此方案的要点是:
a. 定义包括自然键(年份)和自然值(温度)的组合键。
b. 根据组合键对记录进行排序。
c. 针对组合键进行分区和分组时均只考虑自然键。
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 组合键,此例中用于辅助排序,包括年份和温度。
*/
public class IntPair implements WritableComparable<IntPair> {
private IntWritable first;
private IntWritable second;
public IntPair() {
this.first = new IntWritable();
this.second = new IntWritable();
//若注释掉上面两行,使用时会发生异常 java.lang.NullPointerException at IntPair.readFields
}
public IntPair(int first, int second) {
set(new IntWritable(first), new IntWritable(second));
}
public IntPair(IntWritable first, IntWritable second) {
set(first, second);
}
public void set(IntWritable first, IntWritable second) {
this.first = first;
this.second = second;
}
public IntWritable getFirst() {
return first;
}
public IntWritable getSecond() {
return second;
}
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
@Override
public int hashCode() {
return first.hashCode() * 163 + second.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof IntPair) {
IntPair ip = (IntPair) obj;
return first.get() == ip.first.get() && second.get() == ip.second.get();
}
return false;
}
@Override
public String toString() {
return first + "\t" + second;
}
public int compareTo(IntPair o) {
int cmp = first.compareTo(o.first);
if (cmp == 0) {
cmp = second.compareTo(o.second);
}
return cmp;
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
public class MaxTemperatureUsingSecondarySort extends Configured implements Tool {
static class MaxTemperatureMapper extends Mapper<LongWritable, Text, IntPair, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] val = value.toString().split("\\t");
if (val.length == 2) {
context.write(new IntPair(Integer.parseInt(val[0]), Integer.parseInt(val[1])), NullWritable.get());
}
}
}
static class MaxTemperatureReducer extends Reducer<IntPair, NullWritable, IntPair, NullWritable> {
@Override
protected void reduce(IntPair key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get()); //仅输出第一行
}
}
//仅根据 first 分区
public static class FirstPartitioner extends Partitioner<IntPair, NullWritable> {
@Override
public int getPartition(IntPair key, NullWritable value, int numPartitions) {
return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
//仅根据 first 分组
public static class GroupComparator extends WritableComparator {
private static final IntWritable.Comparator INT_COMPARATOR = new IntWritable.Comparator();
protected GroupComparator() {
super(IntPair.class, true);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
return INT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
if (a instanceof IntPair && b instanceof IntPair) {
return ((IntPair) a).getFirst().compareTo(((IntPair) b).getFirst());
}
return super.compare(a, b);
}
}
//根据组合键排序
public static class KeyComparator extends WritableComparator {
protected KeyComparator() {
super(IntPair.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
if (a instanceof IntPair && b instanceof IntPair) {
IntPair ip1 = (IntPair) a;
IntPair ip2 = (IntPair) b;
int cmp = ip1.getFirst().compareTo(ip2.getFirst()); //升序(年份)
if (cmp != 0) {
return cmp;
}
return -ip1.getSecond().compareTo(ip2.getSecond()); //降序(温度)
}
return super.compare(a, b);
}
}
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Parameter number is wrong, please enter two parameters:<input> <output>");
System.exit(-1);
}
Path inputPath = new Path(otherArgs[0]);
Path outputPath = new Path(otherArgs[1]);
//conf.set("fs.defaultFS", "hdfs://vmnode.zhch:9000");
Job job = Job.getInstance(conf, "MaxTemperatureUsingSecondarySort");
//job.setJar("F:/workspace/AssistRanking2/target/AssistRanking2-1.0-SNAPSHOT.jar");
job.setJarByClass(MaxTemperatureUsingSecondarySort.class);
job.setMapperClass(MaxTemperatureMapper.class);
job.setPartitionerClass(FirstPartitioner.class);
job.setSortComparatorClass(KeyComparator.class); //默认根据 Key 的 compareTo 函数排序
job.setGroupingComparatorClass(GroupComparator.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setMapOutputKeyClass(IntPair.class);
job.setOutputKeyClass(IntPair.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args);
System.exit(exitCode);
}
}
4. 运行截图
以上是“Hadoop辅助排序的示例分析”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注天达云行业资讯频道!