这篇文章主要介绍hadoop streaming如何实现多路输出扩展,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
PrefixMultipleOutputFormat 实现的功能点有两个
按照key的前缀输入到不同的目录
删除最终输出结果中的tab
##使用方式### ####按照key 的 前缀输出到不同目录中
$maserati_hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar -libjars ./adts.jar
-D mapred.job.name=$name \
-D mapred.reduce.tasks=5 \
-inputformat org.apache.hadoop.mapred.TextInputFormat \
-outputformat com.sogou.adt.adts.PrefixMultipleOutputFormat \
-input $input \
-output $output \
-mapper ./m_mapper.sh \
-reducer ./m_reducer.sh \
-file m_mapper.sh \
-file m_reducer.sh
其中outputformat
指定的是 自己时间的类 -libjars ./adts.jar
导入的是自己的jar包
###mapper 和 reduer.sh
##m_maper.sh##
#!/bin/bash
awk -F " " '{
for(i=1;i<=NF;i++)
print $i;
}'
###m_reduer.sh###
#!/bin/bash
awk -F "\t" '{
if(NR%3==0)
print "A#"$1;
if(NR%3==1)
print "B#"$1;
if(NR%3==2)
print "C#"$1;
}'
这样就可以将数字分别输入到不同的路径中了
####删除行尾的tab 只需要加入com.sogou.adt.adts.ignoreseparator=true
指定忽略行尾的tab 即可
$maserati_hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar -libjars ./adts.jar
-D mapred.job.name=$name \
-D mapred.reduce.tasks=5 \
-D com.sogou.adt.adts.ignoreseparator=true \
-inputformat org.apache.hadoop.mapred.TextInputFormat \
-outputformat com.sogou.adt.adts.PrefixMultipleOutputFormat \
-input $input \
-output $output \
-mapper ./m_mapper.sh \
-reducer ./m_reducer.sh \
-file m_mapper.sh \
-file m_reducer.sh
###PrefixMultipleOutputFormat的实现方式 由于并不熟悉java语言,在大学学的那点java也早就还给老师了^v^ 搭建编译环境费了些时日,不过好在有个现成的eclipse java 环境 还有两年前搭建好的hadoop环境(它稍微修复一点点就ok了, 能跑程序了, 真是万幸)。
###我的环境
这个简单介绍一下 编译之前我还在担心hadoop streaming 依赖的jar包哪里去找,用不用自己编译(hadoop所有的源码编译让人有点头疼),后来发现jar 包都可以在 hadoop 运行环境中找到,瞬间释然了。
###源码 这段代码挺好理解的了一个LineRecordWriter类 (大部分都是从现有的TextOutputFormat 类中扒的 只是改动一点 读配置 关闭输出tab) generateFileNameForKeyValue
实现了从前缀读取并输出到不同的目录中,代码一目了然
package com.sogou.adt.adts;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
public class PrefixMultipleOutputFormat extends MultipleTextOutputFormat<Text, Text> {
[@Override](https://my.oschina.net/u/1162528)
protected Text generateActualKey(Text key, Text value) {
// TODO Auto-generated method stub
return super.generateActualKey(key, value);
}
protected static class LineRecordWriter<K, V>
implements RecordWriter<K, V> {
private static final String utf8 = "UTF-8";
private static final byte[] newline;
static {
try {
newline = "\n".getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
protected DataOutputStream out;
private final byte[] keyValueSeparator;
public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
this.out = out;
try {
this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
public LineRecordWriter(DataOutputStream out) {
this(out, "\t");
}
/**
* Write the object to the byte stream, handling Text as a special
* case.
* [@param](https://my.oschina.net/u/2303379) o the object to print
* [@throws](https://my.oschina.net/throws) IOException if the write throws, we pass it on
*/
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else {
out.write(o.toString().getBytes(utf8));
}
}
public synchronized void write(K key, V value)
throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(newline);
}
public synchronized void close(Reporter reporter) throws IOException {
out.close();
}
}
[@Override](https://my.oschina.net/u/1162528)
protected RecordWriter<Text, Text> getBaseRecordWriter(FileSystem fs,
JobConf job, String name, Progressable arg3) throws IOException {
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator",
"\t");
Boolean ignoreseparator = job.getBoolean("com.sogou.adt.adts.ignoreseparator", false);
if(ignoreseparator)
{
keyValueSeparator="";
}
if (!isCompressed) {
Path file = FileOutputFormat.getTaskOutputPath(job, name);
fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, arg3);
return new LineRecordWriter<Text, Text>(fileOut, keyValueSeparator);
} else {
Class<? extends CompressionCodec> codecClass =
getOutputCompressorClass(job, GzipCodec.class);
// create the named codec
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
// build the filename including the extension
Path file =
FileOutputFormat.getTaskOutputPath(job,
name + codec.getDefaultExtension());
fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, arg3);
return new LineRecordWriter<Text, Text>(new DataOutputStream
(codec.createOutputStream(fileOut)),
keyValueSeparator);
}
}
[@Override](https://my.oschina.net/u/1162528)
protected String generateFileNameForKeyValue(Text key, Text value,
String name) {
int keyLength = key.getLength();
String outputName = name;
if(keyLength < 2)
return outputName;
Text sep = new Text();
sep.append(key.getBytes(), 1, 1);
if(sep.find("#") != -1)
{
Text newFlag = new Text();
newFlag.append(key.getBytes(), 0, 1);
String flag = newFlag.toString();
//outputName = name+"-"+flag;
outputName = flag+"/"+name+"-"+flag;
Text newValue = new Text();
newValue.append(key.getBytes(), 2, keyLength-2);
key.set(newValue);
}
System.out.printf("[shishuai]System[key [%s]][value:[%s]] output[%s]\n",key.toString(),value.toString(),outputName);
return outputName;
}
}
以上是“hadoop streaming如何实现多路输出扩展”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注天达云行业资讯频道!