这篇文章主要讲解了“HBase中怎么把数据写到HDFS文件中”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“HBase中怎么把数据写到HDFS文件中”吧!
从HBase中选定某张表,比如blog,然后限定表中的某些列,比如昵称nickname,标签tags,将这些列的数据内容写入到HDFS文件中去。
/**
* 选定HBase中的某张表,限定列,然后将其内容数据写入到HDFS文件中。
* */
public class HBaseAndMapReduce2 {
public static void main(String[] args) throws Exception {
System.exit(run());
}
public static int run() throws Exception {
Configuration conf = new Configuration();
conf = HBaseConfiguration.create(conf);
conf.set("hbase.zookeeper.quorum", "192.168.226.129");
Job job = Job.getInstance(conf, "findFriend");
job.setJarByClass(HBaseAndMapReduce2.class);
Scan scan = new Scan();
//取对业务有用的数据 tags, nickname
scan.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"));
scan.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"));
//ImmutableBytesWritable来自hbase数据的类型
/**
* public static void initTableMapperJob(String table, Scan scan,
* Class<? extends TableMapper> mapper,
* Class<?> outputKeyClass,
* Class<?> outputValueClass, Job job)
* */
//确保 blog 表存在, 且表结构与本文一样。
TableMapReduceUtil.initTableMapperJob("blog", scan, FindFriendMapper.class,
Text.class, Text.class, job);
DateFormat df = new SimpleDateFormat( "yyyyMMddHHmmssS" );
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.226.129:9000/hbasemapreduce1/" +df.format( new Date() )));
job.setReducerClass(FindFriendReducer.class);
return job.waitForCompletion(true) ? 0 : 1;
}
// 输入输出的键值
public static class FindFriendMapper extends TableMapper<Text, Text>{
//key是hbase中的行键
//value是hbase中的所行键的所有数据
@Override
protected void map(
ImmutableBytesWritable key,
Result value,
Mapper<ImmutableBytesWritable, Result,Text, Text>.Context context)
throws IOException, InterruptedException {
Text v = null;
String[] kStrs = null;
List<Cell> cs = value.listCells();
for (Cell cell : cs) {
System.out.println( "Cell--->: "+cell );
if ("tags".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
kStrs = Bytes.toString(CellUtil.cloneValue(cell)).split(",");
}else if ("nickname".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
v = new Text(CellUtil.cloneValue(cell));
}
}
for (String kStr : kStrs) {
context.write(new Text(kStr.toLowerCase()), v);
}
}
}
public static class FindFriendReducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
System.out.println( "key--->"+key);
StringBuilder sb = new StringBuilder();
for (Text text : values) {
System.out.println( "value-->"+text);
sb.append((sb.length() > 0 ? ",":"") + text.toString());
}
context.write(key, new Text(sb.toString()));
}
}
}
输出结构:
hadoop Berg-OSChina,BergBerg
hbase OSChina,BergBerg
zookeeper OSChina,BergBerg
感谢各位的阅读,以上就是“HBase中怎么把数据写到HDFS文件中”的内容了,经过本文的学习后,相信大家对HBase中怎么把数据写到HDFS文件中这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是天达云,小编将为大家推送更多相关知识点的文章,欢迎关注!