Hadoop中怎么实现一个PageRank算法 ,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
输入文件格式如下:
1 1.0 2 3 4 5 6 7 8
2 2.0 3 4 5 6 7 8
3 3.0 4 5 6 7 8
4 4.0 5 6 7 8
5 5.0 6 7 8
6 6.0 7 8
7 7.0 8
8 8.0 1 2 3 4 5 6 7
代码如下:
package com.apache.hadoop.io;
import java.io.IOException;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.StringTokenizer;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class PageRank {
public static class MyMapper extends Mapper<Object, Text, Text, Text>
{
private Text id = new Text();
public void map(Object key, Text value, Context context ) throws IOException, InterruptedException
{
String line = value.toString();
//判断是否为输入文件
if(line.substring(0,1).matches("[0-9]{1}"))
{
boolean flag = false;
if(line.contains("_"))
{
line = line.replace("_","");
flag = true;
}
//对输入文件进行处理
String[] values = line.split("\t");
Text t = new Text(values[0]);
String[] vals = values[1].split(" ");
//将url保存下来,下次计算也要用到
String url="_";
double pr = 0;
int i = 0;
int num = 0;
if(flag)
{
i=2;
pr=Double.valueOf(vals[1]);
num=vals.length-2;
}
else
{
i=1;
pr=Double.valueOf(vals[0]);
num=vals.length-1;
}
for(;i<vals.length;i++)
{
url=url+vals[i]+" ";
id.set(vals[i]);
Text prt = new Text(String.valueOf(pr/num));
context.write(id,prt);
}
context.write(t,new Text(url));
}
}
}
public static class MyReducer extends Reducer<Text,Text,Text,Text>
{
private Text result = new Text();
private Double pr = new Double(0);
public void reduce(Text key, Iterable<Text> values, Context context ) throws IOException, InterruptedException
{
double sum=0;
String url="";
for(Text val:values)
{
//发现_标记则表明是url,否则是外链pr,要参与计算
if(!val.toString().contains("_"))
{
sum=sum+Double.valueOf(val.toString());
}
else
{
url=val.toString();
}
}
pr=0.15+0.85*sum;
String str=String.format("%.3f",pr);
result.set(new Text(str+" "+url));
context.write(key,result);
}
}
public static void main(String[] args) throws Exception
{
// String paths="/user/root/input11/file.txt";
//这里的路径为用户自己的输入文件的位置
String paths="hdfs://localhost:9000/user/root/input11";
String path2=paths;
String path3="";
//这里我们迭代20次
for(int i=1;i<=20;i++)
{
System.out.println("This is the "+i+"th job!");
System.out.println("path2:"+path2);
System.out.println("path3:"+path3);
Configuration conf = new Configuration();
Job job = new Job(conf, "PageRank");
path3=paths+i;
job.setJarByClass(PageRank.class);
job.setMapperClass(MyMapper.class);
job.setCombinerClass(MyReducer.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(path2));
FileOutputFormat.setOutputPath(job, new Path(path3));
path2=path3;
job.waitForCompletion(true);
System.out.println(i+"th end!");
}
}
}
结果如下:
1 0.150 0.501 _2 3 4 5 6 7 8
2 0.150 0.562 _3 4 5 6 7 8
3 0.150 0.644 _4 5 6 7 8
4 0.150 0.755 _5 6 7 8
5 0.150 0.919 _6 7 8
6 0.150 1.184 _7 8
7 0.150 1.698 _8
8 0.150 2.822 _1 2 3 4 5 6 7
关于 Hadoop中怎么实现一个PageRank算法 问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注天达云行业资讯频道了解更多相关知识。