本篇内容主要讲解“hadoop datajoin有什么用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“hadoop datajoin有什么用”吧!
hadoop datajoin 之reduce side join
hadoop提供了一个叫datajoin的jar包,用于解决两张表关联的问题。jar位于/hadoop/contrib/datajoin将jar包引入进行开发。
涉及的几个概念:
1.Data Source:基本与关系数据库中的表相似,形式为:(例子中为CSV格式)
Customers Orders
1,Stephanie Leung,555-555-5555 3,A,12.95,02-Jun-2008
2,Edward Kim,123-456-7890 1,B,88.25,20-May-2008
3,Jose Madriz,281-330-8004 2,C,32.00,30-Nov-2007
4,David Stork,408-555-0000 3,D,25.02,22-Jan-2009
2.Tag:由于记录类型(Customers或Orders)与记录本身分离,标记一个Record会确保特殊元数据会一致存在于记录中。在这个目的下,我们将使用每个record自身的Data source名称标记每个record。
3.Group Key:Group Key类似于关系数据库中的链接键(join key),在我们的例子中,group key就是Customer ID(第一列的3)。由于datajoin包允许用户自定义group key,所以其较之关系数据库中的join key更一般、平常。
使用以下样例数据
customers-20140716
1,Stephanie Leung,555-555-5555
2,Edward Kim,123-456-7890
3,Jose Madriz,281-330-8004
4,David Stork,408-555-0000
orders-20140716
3,A,12.95,02-Jun-2008
1,B,88.25,20-May-2008
2,C,32.00,30-Nov-2007
3,D,25.02,22-Jan-2009
请看流程图
第一部分,自定义的数据类型。数据类型主要包含两部分tag和data,tag是给数据打上的标签用来表示数据来源于哪个文件。data是数据记录。
上代码:
public class TaggedWritable extends TaggedMapOutput {
private Text data;
public TaggedWritable() {
this.tag = new Text("");
this.data = new Text("");
}
public TaggedWritable(Text data) {
this.data = data;
}
@Override
public void readFields(DataInput in) throws IOException {
this.tag.readFields(in);
this.data.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
this.tag.write(out);
this.data.write(out);
}
@Override
public Text getData() {
return data;
}
}
第二部分,map函数
public class Mapclass extends DataJoinMapperBase{
@Override
protected Text generateGroupKey(TaggedMapOutput aRecord) {
String line = aRecord.getData().toString();
String[] tokens = line.split(",");
String groupKey = tokens[0];
return new Text(groupKey);
}
@Override
protected Text generateInputTag(String inputFile) {
String datasource = inputFile.split("-")[0];
return new Text(datasource);
}
@Override
protected TaggedWritable generateTaggedMapOutput(Object value) {
TaggedWritable retv = new TaggedWritable(new Text(value.toString()));
retv.setTag(this.inputTag);
return retv;
}
}
map函数中要特别注意protected TaggedWritable generateTaggedMapOutput(Object value) 该方法的返回类型为你第一步定义的类型。
第三部分,reduce
public class Reduce extends DataJoinReducerBase{
@Override
protected TaggedMapOutput combine(Object[] tags, Object[] values) {
if (tags.length < 2) {
return null;
}
String joinedStr = "";
for (int i=0; i<values.length; i++) {
if (i > 0){
joinedStr += ",";
}
TaggedWritable tw = (TaggedWritable) values[i];
String line = tw.getData().toString();
System.out.println("line=========:"+line);
String[] tokens = line.split(",", 2);
joinedStr += tokens[1];
}
TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
retv.setTag((Text) tags[0]);
return retv;
}
}
reduce过程会将主键与数据组合在一起输出,你拼接的字符串中无需写入主键。
public class Datajoin extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
JobConf job = new JobConf(conf, Datajoin.class);
job.setJarByClass(Datajoin.class);
Path in = new Path("hdfs://172.16.0.87:9000/user/jeff/datajoin/");
Path out = new Path("hdfs://172.16.0.87:9000/user/jeff/datajoin/out");
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("DataJoin");
job.setMapperClass(Mapclass.class);
job.setReducerClass(Reduce.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TaggedWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.set("mapred.textoutputformat.separator", ",");
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Datajoin(), args);
System.exit(res);
}
}
运行mapreduce任务后的输出为:
1,Stephanie Leung,555-555-5555,B,88.25,20-May-2008
2,Edward Kim,123-456-7890,C,32.00,30-Nov-2007
3,Jose Madriz,281-330-8004,A,12.95,02-Jun-2008
3,Jose Madriz,281-330-8004,D,25.02,22-Jan-2009
可以在reduce的combin函数中控制函数的输出形式,左联,或者右联。
到此,相信大家对“hadoop datajoin有什么用”有了更深的了解,不妨来实际操作一番吧!这里是天达云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!