一、主程序调用
SolrDeleteDuplicates dedup = new SolrDeleteDuplicates(); dedup.setConf(getConf()); dedup.dedup(solrUrl);
二、job任务配置
JobConf job = new NutchJob(getConf());
job.setInputFormat(SolrInputFormat.class); job.setMapperClass(IdentityMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(SolrRecord.class);
job.setReducerClass(SolrDeleteDuplicates.class); job.setOutputFormat(NullOutputFormat.class);
JobClient.runJob(job);
三、Map、reduce任务的输入和输出 Map任务输入、输出 public void map( K key, V val, OutputCollector<K, V> output
reduce任务输入、输出 输入:Text/Iterator<SolrRecord> 输出:Text/SolrRecord
public void reduce( Text key, Iterator<SolrRecord> values, OutputCollector<Text, SolrRecord> output
四、job任务输入类SolrInputFormat
getSplits方法将所有的文档按照数量平均分片 getRecordReader方法中利用solrserver查询了当前分片包含的所有doc记录,solrrecord返回了的当前的RecordReader<Text, SolrRecord>记录(RecordReader是一个全局的变量),并且有获取下一个方法。
(1)、SolrInputFormat的getSplits方法
1、根据job对象的参数,获取solrserver对象。 2、构建并执行查询(查询参数:[*:*、id、setRow(1)] ),获取响应对象 3、根据响应对象获取索引总数,除以分片数,得到每一片分配多少个索引 4、根据分片数创建 SolrInputSplit数组对象, 5、根据solr输入分片的开始和结束位置,实例化SolrInputSplit对象
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { SolrServer solr = SolrUtils.getCommonsHttpSolrServer(job);
final SolrQuery solrQuery = new SolrQuery(SOLR_GET_ALL_QUERY); solrQuery.setFields(SolrConstants.ID_FIELD); solrQuery.setRows(1);
QueryResponse response; try { response = solr.query(solrQuery); } catch (final SolrServerException e) { throw new IOException(e); }
int numResults = (int)response.getResults().getNumFound(); int numDocsPerSplit = (numResults / numSplits); int currentDoc = 0; SolrInputSplit[] splits = new SolrInputSplit[numSplits]; for (int i = 0; i < numSplits - 1; i++) { splits[i] = new SolrInputSplit(currentDoc, numDocsPerSplit); currentDoc += numDocsPerSplit; } splits[splits.length - 1] = new SolrInputSplit(currentDoc, numResults - currentDoc);
return splits; }
(2)、SolrInputFormat的getRecordReader()方法
1、获取solrserver对象 2、将传入的split参数,强转成SolrInputSplit对象,并获取这个分片的文档总数 3、构建查询对象,执行查询(参数[*:*,id,boost,tstamp,digest, SolrInputSplit中的开始位置,文档总数 ])。 4、根据响应对象,获取结果集 5、对匿名内部内RecordReader做了实现,并且返回
public RecordReader<Text, SolrRecord> getRecordReader(final InputSplit split, final JobConf job, Reporter reporter) throws IOException {
//1、获取solrserver对象 SolrServer solr = SolrUtils.getCommonsHttpSolrServer(job);
//2、将传入的split参数,强转成SolrInputSplit对象,并获取这个分片的文档总数 SolrInputSplit solrSplit = (SolrInputSplit) split; final int numDocs = solrSplit.getNumDocs(); //3、构建查询对象,执行查询(参数[*:*,id,boost,tstamp,digest, SolrInputSplit中的开始位置,文档总数 ]) SolrQuery solrQuery = new SolrQuery(SOLR_GET_ALL_QUERY); solrQuery.setFields(SolrConstants.ID_FIELD, SolrConstants.BOOST_FIELD, SolrConstants.TIMESTAMP_FIELD, SolrConstants.DIGEST_FIELD); solrQuery.setStart(solrSplit.getDocBegin()); solrQuery.setRows(numDocs);
QueryResponse response;
try { response = solr.query(solrQuery); } catch (final SolrServerException e) { throw new IOException(e); }
//4、根据响应对象,获取结果集 final SolrDocumentList solrDocs = response.getResults();
return new RecordReader<Text, SolrRecord>() { //当前的文档 private int currentDoc = 0;
public void close() throws IOException { }
public Text createKey() { return new Text(); }
public SolrRecord createValue() { return new SolrRecord(); } //获取当前的指针 public long getPos() throws IOException { return currentDoc; } //获取进度 public float getProgress() throws IOException { return currentDoc / (float) numDocs; } //获取下一个 public boolean next(Text key, SolrRecord value) throws IOException { if (currentDoc >= numDocs) { return false; } // SolrDocument doc = solrDocs.get(currentDoc);
//获取摘要 String digest = (String) doc.getFieldValue(SolrConstants.DIGEST_FIELD); //把摘要作为key key.set(digest); //value(SolrRecord) //赋值:通过doc给solrrecord的id,tstamp,boost 3个字段赋值 value.readSolrDocument(doc); //指针加自增1 currentDoc++; return true; } }; }
五、map()方法和reduce()方法中的实现
(1)、map任务 (2)、reduce任务
去重逻辑:
reduce任务会遍历每一个record,并执行reduce()方法中的代码 reduce()方法中,会遍历处于当前文档之后的所有文档,如果分值和时间都比当前的小,会调用solrj删除这个文档,如果比当前的大,会删除当前的,并把当前的替换成这个大的。
public void reduce(Text key, Iterator<SolrRecord> values, OutputCollector<Text, SolrRecord> output, Reporter reporter) throws IOException { //1、下一个SolrRecord对象 SolrRecord recordToKeep = new SolrRecord(values.next());
//2、遍历了SolrRecord while (values.hasNext()) { // SolrRecord solrRecord = values.next();
//boost、tstamp参与比较 //如果当前的分值, 比保持的分支高,并且时间比保持的新,就根据id删除这条索引, if (solrRecord.getBoost() > recordToKeep.getBoost() || (solrRecord.getBoost() == recordToKeep.getBoost() && solrRecord.getTstamp() > recordToKeep.getTstamp())) { updateRequest.deleteById(recordToKeep.id); recordToKeep = new SolrRecord(solrRecord); } else { updateRequest.deleteById(solrRecord.id); } numDeletes++; reporter.incrCounter("SolrDedupStatus", "Deleted documents", 1); if (numDeletes >= NUM_MAX_DELETE_REQUEST) { try { LOG.info("SolrDeleteDuplicates: deleting " + numDeletes + " duplicates"); updateRequest.process(solr); } catch (SolrServerException e) { throw new IOException(e); } updateRequest = new UpdateRequest(); numDeletes = 0; } } }
六、关于digest
doc中的digest字段,是在IndexerMapReduce类中的reduce方法中加入的 // add digest, used by dedup doc.add("digest", metadata.get(Nutch.SIGNATURE_KEY));
Metadata中包含了一个HashMap final Metadata metadata = parseData.getContentMeta(); |