小编给大家分享一下Mapreduce如何扫描hbase表建立solr索引,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!
package com.hbase.index;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RebuildHbaseIndex {
public static final Logger LOG = LoggerFactory
.getLogger(RebuildHbaseIndex.class);
public static void main(String[] args) throws IOException,ClassNotFoundException, InterruptedException {
Configuration conf = HBaseConfiguration.create();
conf.setBoolean("mapred.map.tasks.speculative.execution", false);
//每次读取100条数据
conf.setInt("hbase.client.scanner.caching", 100);
String[] tbNames={"Suggest"};
for(int i=0;i<tbNames.length;i++){
Job job = SolrIndexerMapper.createSubmittableJob(conf, tbNames[i]);
if (job == null) {
System.exit(-1);
}
job.waitForCompletion(true);
Counter counter = job.getCounters().findCounter(SolrIndexerMapper.Counters.ROWS);
LOG.info("tbNames[i]: Put " + counter.getValue() + " records to Solr!"); // 打印日志
}
}
}
package com.hbase.index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.common.SolrInputDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SolrIndexerMapper extends TableMapper<Text, Text> {
public static final Logger LOG = LoggerFactory.getLogger(SolrIndexerMapper.class);
//计数器
public static enum Counters {ROWS};
//只创建一个SolrServer实例
private SolrServer solr;
public String solrURL="http://192.168.1.79:8983/solr/IK_shard1_replica1";
private int commitSize;
private final List<SolrInputDocument> docs=new ArrayList<SolrInputDocument>();
//任务开始调用
protected void setup(Context context){
Configuration conf=context.getConfiguration();
solr=new HttpSolrServer(solrURL);
//一次性添加文档数
commitSize=conf.getInt("solr.commit.size", 1000);
}
@Override
protected void map(ImmutableBytesWritable row, Result values,Context context)throws IOException, InterruptedException {
SolrInputDocument solrDoc = new SolrInputDocument();
String rowkey=Bytes.toString(values.getRow());
String id=Bytes.toString(values.getRow());
String tableName="Suggest";
solrDoc.addField("id", id);
solrDoc.addField("rowkey", rowkey);
//hbase里面需要增加tableName字段
solrDoc.addField("tableName", tableName);
for (KeyValue kv : values.list()) {
String fieldName = Bytes.toString(kv.getQualifier());
String fieldValue = Bytes.toString(kv.getValue());
solrDoc.addField(fieldName, fieldValue);
}
docs.add(solrDoc);
if (docs.size() >= commitSize) {
try {
LOG.info("添加文档:Adding " + Integer.toString(docs.size()) + " documents");
solr.add(docs); // 索引文档
} catch (final SolrServerException e) {
final IOException ioe = new IOException();
ioe.initCause(e);
throw ioe;
}
docs.clear();
}
context.getCounter(Counters.ROWS).increment(1);
}
//任务结束时候调用
@Override
protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context)
throws IOException, InterruptedException {
try {
if(!docs.isEmpty()){
LOG.info("清空队列:Adding " + Integer.toString(docs.size()) + " documents");
solr.add(docs);
docs.clear();
}
} catch (final SolrServerException e) {
final IOException ioe=new IOException();
ioe.initCause(e);
throw ioe;
}
}
public static Job createSubmittableJob(Configuration conf, String tableName) throws IOException {
Job job=Job.getInstance(conf,"SolrIndex_" + tableName);
job.setJarByClass(SolrIndexerMapper.class);
Scan scan=new Scan();
//scan的数据不放在缓存中,一次性的
scan.setCacheBlocks(false);
job.setOutputFormatClass(NullOutputFormat.class);
TableMapReduceUtil.initTableMapperJob(tableName, scan,
SolrIndexerMapper.class, null, null, job); // 不需要输出,键、值类型为null
job.setNumReduceTasks(0); // 无reduce任务
return job;
}
}
看完了这篇文章,相信你对“Mapreduce如何扫描hbase表建立solr索引”有了一定的了解,如果想了解更多相关知识,欢迎关注天达云行业资讯频道,感谢各位的阅读!