HBase中怎么把数据写到HDFS文件中
更新:HHH   时间:2023-1-7


这篇文章主要讲解了“HBase中怎么把数据写到HDFS文件中”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“HBase中怎么把数据写到HDFS文件中”吧!

从HBase中选定某张表,比如blog,然后限定表中的某些列,比如昵称nickname,标签tags,将这些列的数据内容写入到HDFS文件中去。

/**
 * 选定HBase中的某张表,限定列,然后将其内容数据写入到HDFS文件中。
 * */
public class HBaseAndMapReduce2 {

	public static void main(String[] args) throws Exception {
		System.exit(run());
	}

	public static int run() throws Exception {
		Configuration conf = new Configuration();
		conf = HBaseConfiguration.create(conf);
		conf.set("hbase.zookeeper.quorum", "192.168.226.129");

		Job job = Job.getInstance(conf, "findFriend");
		job.setJarByClass(HBaseAndMapReduce2.class);


		Scan scan = new Scan();
		//取对业务有用的数据 tags, nickname
		scan.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"));
		scan.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"));

		//ImmutableBytesWritable来自hbase数据的类型
		/**
		 * public static void initTableMapperJob(String table, Scan scan,
		 * 		Class<? extends TableMapper> mapper,
		 * 		Class<?> outputKeyClass,
		 * 		Class<?> outputValueClass, Job job)
		 * */
		//确保 blog 表存在, 且表结构与本文一样。 
		TableMapReduceUtil.initTableMapperJob("blog", scan, FindFriendMapper.class, 
				Text.class,  Text.class, job);
		DateFormat df = new SimpleDateFormat( "yyyyMMddHHmmssS" );
		
		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.226.129:9000/hbasemapreduce1/" +df.format( new Date() )));

		job.setReducerClass(FindFriendReducer.class);
		
		return job.waitForCompletion(true) ? 0 : 1;
	}
    
    // 输入输出的键值
	public static class FindFriendMapper extends TableMapper<Text, Text>{
		//key是hbase中的行键
		//value是hbase中的所行键的所有数据
		@Override
		protected void map(
				ImmutableBytesWritable key,
				Result value,
				Mapper<ImmutableBytesWritable, Result,Text, Text>.Context context)
						throws IOException, InterruptedException {

			Text v = null;
			String[] kStrs = null;
			List<Cell> cs = value.listCells();
			for (Cell cell : cs) {
				System.out.println( "Cell--->: "+cell );
				if ("tags".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
					kStrs = Bytes.toString(CellUtil.cloneValue(cell)).split(",");
				}else if ("nickname".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
					v = new Text(CellUtil.cloneValue(cell));
				}
			}

			for (String kStr : kStrs) {
				context.write(new Text(kStr.toLowerCase()), v);
			}

		}
	}

	public static class FindFriendReducer extends Reducer<Text, Text, Text, Text>{
		@Override
		protected void reduce(Text key, Iterable<Text> values,
				Reducer<Text, Text, Text, Text>.Context context)
						throws IOException, InterruptedException {
			System.out.println(  "key--->"+key);
			StringBuilder sb = new StringBuilder();
			for (Text text : values) {
				System.out.println(  "value-->"+text);
				sb.append((sb.length() > 0 ? ",":"") + text.toString());
			}
			context.write(key, new Text(sb.toString()));
		}
	}
}

输出结构:

hadoop	Berg-OSChina,BergBerg
hbase	OSChina,BergBerg
zookeeper	OSChina,BergBerg

感谢各位的阅读,以上就是“HBase中怎么把数据写到HDFS文件中”的内容了,经过本文的学习后,相信大家对HBase中怎么把数据写到HDFS文件中这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是天达云,小编将为大家推送更多相关知识点的文章,欢迎关注!

返回云计算教程...