这篇文章将为大家详细讲解有关hadoop中如何利用mapreduce实现wordcount和电影评分预测,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
mapreduce中map指映射,map指的是归约。
mapreduce是一个key-value来处理数据的编程模型,它使用map将一组key-value映射为另一组key-value
通过底层传递给reduce,在reduce中,它将所有map过程传递过来的key-value进行归约,相同的key值,value值会放在一起。mapreduce内部还会对reduce过程中的key值进行一次排序。
一.WordCount
public class WordCount
{
//
public static final String HDFS = "hdfs://localhost:8888";
public static final Pattern DELIMITER = Pattern.compile("\\b([a-zA-Z]+)\\b");
//自定义Map类型执行 "映射"这一部分
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable>
{
//mapreduce中,Text相当于String类型,IntWritable相当于Int类型
//LongWritable是实现了WritableComparable的一个数据类型。
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
//重写父类map()函数
public void map(LongWritable key, Text value,
Context context)
throws IOException, InterruptedException
{
//读取一行数据
String line = value.toString();
//将该行字符全部变为小写
line = line.toLowerCase();
//根据定义好的正则表达式拆分一行字符串。
Matcher matcher = DELIMITER.matcher(line);
while(matcher.find()){
//将分解的一个个单词类型转化为Text。
word.set(matcher.group());
//将相应的key-value值传入。key值为单词,value值为1.
context.write(word,one);
}
}
}
//自定义Combine过程先对本地进行的map进行一次reduce过程,减少传递给主机的数据量.
public static class Combine extends Reducer <Text, IntWritable, Text, IntWritable>
{
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
//遍历同一个key值的所有value,所有的value放在同一个Iterable中。
for (IntWritable line : values)
{
sum += line.get();
}
IntWritable value = new IntWritable(sum);
//将key-value按照指定的输出格式输出。
context.write(key, value);
}
}
public static class Reduce extends Reducer <Text, IntWritable, Text, IntWritable>
{
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable line : values)
{
sum += line.get();
}
IntWritable value = new IntWritable(sum);
context.write(key, value);
}
}
public static void main(String[] args) throws Exception
{
JobConf conf = WordCount.config();
String input = "data/1.txt";
String output = HDFS + "/user/hdfs/wordcount";
//自定义HDFS文件操作工具类
HdfsDAO hdfs = new HdfsDAO(WordCount.HDFS, conf);
//移除存在的文件否则会报文件生成文件已存在的错误
hdfs.rmr(output);
Job job = new Job(conf);
job.setJarByClass(WordCount.class);
//设置输出的key值类型
job.setOutputKeyClass(Text.class);
//设置输出的value值类型
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(WordCount.Map.class);
job.setCombinerClass(WordCount.Combine.class);
job.setReducerClass(WordCount.Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
//设置输出的格式,这里使用的是自定义的FileOutputFormat类,见下文。
job.setOutputFormatClass(ParseTextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static JobConf config() {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("WordCount");
conf.addResource("classpath:/hadoop/core-site.xml");
conf.addResource("classpath:/hadoop/hdfs-site.xml");
conf.addResource("classpath:/hadoop/mapred-site.xml");
// conf.set("io.sort.mb", "");
return conf;
}
}
自定义文件输出格式
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
public class ParseTextOutputFormat<K, V> extends FileOutputFormat<K, V>{
protected static class LineRecordWriter<K, V> extends RecordWriter<K, V> {
private static final String utf8 = "UTF-8";
private static final byte[] newline;
static {
try {
newline = "\n".getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
protected DataOutputStream out;
private final byte[] keyValueSeparator;
public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
this.out = out;
try {
this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
public LineRecordWriter(DataOutputStream out) {
this(out, "\t");
}
/**
* Write the object to the byte stream, handling Text as a special
* case.
* @param o the object to print
* @throws IOException if the write throws, we pass it on
*/
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else {
out.write(o.toString().getBytes(utf8));
}
}
public synchronized void write(K key, V value)
throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(newline);
}
public synchronized
void close(TaskAttemptContext context) throws IOException {
out.close();
}
}
public RecordWriter<K, V>
getRecordWriter(TaskAttemptContext job
) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator= conf.get("mapred.textoutputformat.separator",
":");
CompressionCodec codec = null;
String extension = "";
if (isCompressed) {
Class<? extends CompressionCodec> codecClass =
getOutputCompressorClass(job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
if (!isCompressed) {
FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
} else {
FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter<K, V>(new DataOutputStream
(codec.createOutputStream(fileOut)),
keyValueSeparator);
}
}
}
二.电影评分预测
整个算法的实现中使用了slop one算法来预测评分,此处自定义的输出类与上文一致。
输入文件格式为userId::movieId::score
package main.java.org.conan.myhadoop.recommend;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.hadoop.mapred.JobConf;
import main.java.org.conan.myhadoop.hdfs.HdfsDAO;
public class Recommend {
public static final String HDFS = "hdfs://localhost:8888";
public static final Pattern DELIMITER = Pattern.compile("[\t,]");
public static final Pattern STRING = Pattern.compile("[\t,:]");
// public final static int movieListLength = 100000;
// public static int []movieList = new int[movieListLength];
public static List movieList = new ArrayList();
public static Map userScore = new HashMap();
public static void main(String[] args) throws Exception {
Map<String, String> path = new HashMap<String, String>();
String in = "logfile/4.txt";
String out = HDFS + "/user/hdfs/recommend" + "/step5";
// path.put("data", "logfile/small.csv");
// path.put("data", "logfile/ratings.dat");
if(args.length == 2){
in = args[0];
out = HDFS + args[1];
System.out.println(out);
}
//设置数据输入路径
path.put("data", in);
//设置第一步输入文件路径
path.put("Step1Input", HDFS + "/user/hdfs/recommend");
//设置第一步结果输出路径
path.put("Step1Output", path.get("Step1Input") + "/step1");
//设置第二步输入文件路径
path.put("Step2Input", path.get("Step1Output"));
//设置第二步结果输出路径
path.put("Step2Output", path.get("Step1Input") + "/step2");
//设置第三步输入文件路径
path.put("Step3Input1", path.get("data"));
// path.put("Step3Input2", "logfile/movie/movies.dat");
//设置第三步结果输出路径
path.put("Step3Output", path.get("Step1Input") + "/step3");
// path.put("Step3Input2", path.get("Step2Output"));
// path.put("Step3Output2", path.get("Step1Input") + "/step3_2");
//
//设置第四步输入文件路径1
path.put("Step4Input1", path.get("Step2Output"));
//设置第四步输入文件路径2
path.put("Step4Input2", path.get("Step3Output"));
//设置第四步结果输出路径
path.put("Step4Output", path.get("Step1Input") + "/step4");
//
//设置第五步输入文件路径
path.put("Step5Input", path.get("Step4Output"));
// path.put("Step5Input2", path.get("Step3Output2"));
//设置第五步结果输出路径
path.put("Step5Output", out);
//第一步,根据给出的用户评分文件,求出每个用户对物品的评分矩阵
Step1.run(path);
//根据第一步的输出结果计算物品评分的同现矩阵
Step2.run(path);
//获取所有用户评过分的电影,并输出每位用户对每部电影的评分,未评过则记为0
Step3.run(path);
//根据第二步与第三步的结果计算出每位用户对每部电影的评分
Step4.run(path);
//整理输出格式。
Step5.run(path);
System.exit(0);
}
public static JobConf config() {
JobConf conf = new JobConf(Recommend.class);
conf.setJobName("Recommand");
conf.addResource("classpath:/hadoop/core-site.xml");
conf.addResource("classpath:/hadoop/hdfs-site.xml");
conf.addResource("classpath:/hadoop/mapred-site.xml");
// conf.set("io.sort.mb", "");
return conf;
}
}
//求出用户对物品的评分矩阵,即得出用户对电影 的评分矩阵
//每一行数据代表一个用户对所有打分电影的结果
//key值为userId, value值为movieID:score,movieId:score
public class Step1 {
public static class Step1_ToItemPreMapper extends MapReduceBase implements Mapper<Object, Text, Text, Text> {
private final static Text k = new Text();
private final static Text v = new Text();
@Override
public void map(Object key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String[] tokens = value.toString().split("::");
String itemID = tokens[1];
String pref = tokens[2];
k.set(tokens[0]);
v.set(itemID + ":" + pref);
output.collect(k, v);
}
}
public static class Step1_ToUserVectorReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String value= "";
int num = 0;
while (values.hasNext()) {
num++;
value += values.next();
value += ",";
if( num >= 400 ){
value = value.substring(0, value.length() - 1);
Text v = new Text(value);
output.collect(key, v);
value = "";
num = 0;
break;
}
}
if(num != 0){
value = value.substring(0, value.length() - 1);
Text v = new Text(value);
output.collect(key, v);
}
}
}
public static void run(Map<String, String> path) throws IOException {
JobConf conf = Recommend.config();
String input = path.get("Step1Input");
String output = path.get("Step1Output");
HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
// hdfs.rmr(output);
hdfs.rmr(input);
hdfs.mkdirs(input);
hdfs.copyFile(path.get("data"), input);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(Text.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(Step1_ToItemPreMapper.class);
conf.setReducerClass(Step1_ToUserVectorReducer.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(input));
FileOutputFormat.setOutputPath(conf, new Path(output));
RunningJob job = JobClient.runJob(conf);
while (!job.isComplete()) {
job.waitForCompletion();
}
}
}
//根据第一步的 结果求出物品的同现矩阵
//算法方面,没有太好的算法处理两个for循环,就在求物品同现矩阵的时候使用一个随机数,得出一个movieA:movieB的结果
public class Step2 {
public static class Step2_UserVectorToCooccurrenceMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, DoubleWritable> {
private final static Text k = new Text();
private final static DoubleWritable v = new DoubleWritable();
// private final static IntWritable v = new IntWritable(1);
@Override
public void map(LongWritable key, Text values, OutputCollector<Text, DoubleWritable> output, Reporter reporter) throws IOException {
String[] tokens = Recommend.DELIMITER.split(values.toString());
for (int i = 1; i < tokens.length; i++) {
String itemID = tokens[i].split(":")[0];
// for (int j = 1; j < i+1; j++) {
// String itemID2 = tokens[j].split(":")[0];
// double sum =Double.parseDouble(tokens[i].split(":")[1])-Double.parseDouble(tokens[j].split(":")[1]);
//// if(sum<0.5) break;
//// if(sum>4.5) break;
// k.set(itemID + ":" + itemID2+":");
// v.set(sum);
// output.collect(k, v);
// k.set(itemID2 + ":" + itemID+":");
// v.set(sum);
// output.collect(k, v);
//
// }
Random random = new Random();
int j;
j = random.nextInt(tokens.length - 1) + 1;
String itemID2 = tokens[j].split(":")[0];
double sum =Double.parseDouble(tokens[i].split(":")[1])-Double.parseDouble(tokens[j].split(":")[1]);
k.set(itemID + ":" + itemID2+":");
v.set(sum);
output.collect(k, v);
}
}
}
public static class Step2_UserVectorToConoccurrenceReducer extends MapReduceBase implements Reducer<Text, DoubleWritable, Text, DoubleWritable> {
private DoubleWritable result = new DoubleWritable();
@Override
public void reduce(Text key, Iterator<DoubleWritable> values, OutputCollector<Text, DoubleWritable> output, Reporter reporter) throws IOException {
double sum = 0;
int count = 0;
while (values.hasNext()) {
sum += values.next().get();
count++;
}
sum = sum/count*1.0;
DecimalFormat df = new DecimalFormat("#.0000");
sum = Double.valueOf(df.format(sum));
// System.out.println(key+"---count----"+count+"-------"+sum);
result.set(sum);
output.collect(key, result);
}
}
public static void run(Map<String, String> path) throws IOException {
JobConf conf = Recommend.config();
String input = path.get("Step2Input");
String output = path.get("Step2Output");
HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
hdfs.rmr(output);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(DoubleWritable.class);
conf.setMapperClass(Step2_UserVectorToCooccurrenceMapper.class);
// conf.setCombinerClass(Step2_UserVectorToConoccurrenceReducer.class);
conf.setReducerClass(Step2_UserVectorToConoccurrenceReducer.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(input));
FileOutputFormat.setOutputPath(conf, new Path(output));
RunningJob job = JobClient.runJob(conf);
while (!job.isComplete()) {
job.waitForCompletion();
}
}
}
//取所有用户评过分的电影,并输出每位用户对每部电影的评分,未评过则记为0
//此处因为没有一个专门的电影记录为文件,所以就只能从数据文件中获取到所有的电影ID。
//并将所有的电影ID维持在一个线性表中,但是当数据文件过大时,每次读取一条数据都要从线性表中判断该电影是否已经记录
//,导致效率会越来越低
//并且维持一个静态map记录每个用户对的第一部评过分的电影,以此作为标准,使用物品同现矩阵进行计算
public class Step3 {
public static class Step4_PartialMultiplyMapper extends Mapper<LongWritable, Text, Text, Text> {
private final static Text k = new Text();
private final static Text v = new Text();
private String flag; //判断读取的数据集
// private final static Map<Integer, List<Cooccurrence>> cooccurrenceMatrix = new HashMap<Integer, List<Cooccurrence>>();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
FileSplit split = (FileSplit) context.getInputSplit();
flag = split.getPath().getParent().getName();// 判断读的数据集
}
@Override
public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
String[] tokens = values.toString().split("::");
// System.out.println(flag);
// System.out.println(tokens.length);
//
// for(int i = 0;i < tokens.length;i++){
// System.out.println(tokens[i]);
// }
// 获取所有的电影数据,应该有一个文件记录所有的电影信息,就不用判断是否包含直接添加
if( !Recommend.movieList.contains(tokens[1]) ){
Recommend.movieList.add(tokens[1]);
}
// if(flag.equals("movie")){
// Recommend.movieList.add(tokens[0]);
// }
// else{
k.set(tokens[0]);
v.set(tokens[1] + "," + tokens[2]);
context.write(k, v);
// }
}
}
public static class Step4_AggregateAndRecommendReducer extends Reducer<Text, Text, Text, Text> {
private final static Text v = new Text();
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Map userMovieList = new HashMap();
for(Text line : values){
String[] tokens = Recommend.DELIMITER.split(line.toString());
userMovieList.put(tokens[0], tokens[1]);
}
for(int i = 0; i < Recommend.movieList.size();i++){
// System.out.println("key---->" + key);
// System.out.println("value---->" + v);
if(!userMovieList.containsKey(Recommend.movieList.get(i))){
v.set(Recommend.movieList.get(i) + "," + 0);
context.write(key, v);
}
else{
v.set(Recommend.movieList.get(i) + "," + userMovieList.get(Recommend.movieList.get(i)));
context.write(key, v);
}
}
}
}
public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
JobConf conf = Recommend.config();
String input1 = path.get("Step3Input1");
// String input2 = path.get("Step3Input2");
String output = path.get("Step3Output");
HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
hdfs.rmr(output);
Job job = new Job(conf);
job.setJarByClass(Step3.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Step3.Step4_PartialMultiplyMapper.class);
job.setReducerClass(Step3.Step4_AggregateAndRecommendReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(input1));
FileOutputFormat.setOutputPath(job, new Path(output));
do{
job.waitForCompletion(false);
}while(!job.isComplete());
}
}
//根据第二步与第三步的结果计算出每位用户对每部电影的评分
//根据第三步结果,读取数据,当发现用户对某部电影的评分为0的时候,
//根据第二步得到的map获取数据和物品同现矩阵计算得出用户对电影的评分
public class Step4 {
public static class Step4Update_PartialMultiplyMapper extends Mapper<LongWritable, Text, Text, Text> {
private String flag;// A同现矩阵 or B评分矩阵
@Override
protected void setup(Context context) throws IOException, InterruptedException {
FileSplit split = (FileSplit) context.getInputSplit();
flag = split.getPath().getParent().getName();// 判断读的数据集
// System.out.println(flag);
}
@Override
public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
String[] tokens = Recommend.DELIMITER.split(values.toString());
if (flag.equals("step2")) {// 同现矩阵
// System.out.println(tokens.length);
// for(int i = 0; i < tokens.length;i++){
// System.out.println(tokens[i]);
// }
// String[] v1 = tokens[0].split(":");
// String itemID1 = v1[0];
// String itemID2 = v1[1];
// String num = tokens[1];
//
// Text k = new Text(itemID1);
// Text v = new Text("A:" + itemID2 + "," + num);
String[] v1 = tokens[0].split(":");
Text k = new Text(v1[0]);
Text v = new Text("M:" + v1[1] + "," + tokens[1]);
context.write(k, v);
// System.out.println(k.toString() + " " + v.toString());
} else if (flag.equals("step3")) {// 评分矩阵
// System.out.println(tokens.length);
// for(int i = 0; i < tokens.length;i++){
// System.out.println(tokens[i]);
// }
// String[] v2 = tokens[1].split(",");
//// String itemID = tokens[0];
//// String userID = v2[0];
//// String pref = v2[1];
if(Double.parseDouble(tokens[2]) != 0 && !Recommend.userScore.containsKey(tokens[0])){
Recommend.userScore.put(tokens[0], tokens[1] + "," + tokens[2]);
}
////
Text k = new Text(tokens[1]);
Text v = new Text("U:" + tokens[0] + "," + tokens[2]);
context.write(k, v);
// System.out.println(k.toString() + " " + v.toString());
}
}
}
public static class Step4Update_AggregateReducer extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// System.out.println("key--->" + key);
Map movie = new HashMap();
Text k;
Text v;
//Map user = new HashMap();
List list = new ArrayList();
for (Text line : values) {
list.add(line.toString());
// System.out.println(line.toString());
String[] tokens = Recommend.STRING.split(line.toString());
if(tokens[0].equals("M")){
// System.out.println(tokens[1]);
// System.out.println(tokens[2]);
movie.put(tokens[1], tokens[2]);
}
}
for(int i = 0;i < list.size();i++) {
String[] tokens = Recommend.STRING.split((String) list.get(i));
//System.out.println(tokens[0]);
if(tokens[0].equals("U")){
if(Double.parseDouble(tokens[2]) == 0 ){
String userScore = (String) Recommend.userScore.get(tokens[1]);
String[] temps = Recommend.STRING.split(userScore);
k = new Text(key);
// System.out.println("useid"+tokens[1]+"movie score"+temps[1]);
// System.out.println("movie id"+movie.get(temps[0]));
double temp = 0;
if(movie.get(temps[0]) != null){
Double.parseDouble((String) movie.get(temps[0]));
}
double score = Double.parseDouble(temps[1])+temp;
v = new Text(tokens[1] + "," + score);
}
else{
k = new Text(key);
v = new Text(tokens[1] + "," + tokens[2]);
}
// System.out.println("key-->" + k);
// System.out.println("value-->" + v);
context.write(k, v);
}
}
// System.out.println(key.toString() + ":");
//
// Map<String, String> mapA = new HashMap<String, String>();
// Map<String, String> mapB = new HashMap<String, String>();
//
// for (Text line : values) {
// String val = line.toString();
// System.out.println(val);
//
// if (val.startsWith("A:")) {
// String[] kv = Recommend.DELIMITER.split(val.substring(2));
// mapA.put(kv[0], kv[1]);
//
// } else if (val.startsWith("B:")) {
// String[] kv = Recommend.DELIMITER.split(val.substring(2));
// mapB.put(kv[0], kv[1]);
//
// }
// }
//
// double result = 0;
// Iterator<String> iter = mapA.keySet().iterator();
// while (iter.hasNext()) {
// String mapk = iter.next();// itemID
//
// int num = Integer.parseInt(mapA.get(mapk));
// Iterator<String> iterb = mapB.keySet().iterator();
// while (iterb.hasNext()) {
// String mapkb = iterb.next();// userID
// double pref = Double.parseDouble(mapB.get(mapkb));
// result = num * pref;// 矩阵乘法相乘计算
//
// Text k = new Text(mapkb);
// Text v = new Text(mapk + "," + result);
// context.write(k, v);
// System.out.println(k.toString() + " " + v.toString());
// }
// }
}
}
public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
JobConf conf = Recommend.config();
String input1 = path.get("Step4Input1");
String input2 = path.get("Step4Input2");
String output = path.get("Step4Output");
HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
hdfs.rmr(output);
Job job = new Job(conf);
job.setJarByClass(Step4.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Step4.Step4Update_PartialMultiplyMapper.class);
job.setReducerClass(Step4.Step4Update_AggregateReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));
FileOutputFormat.setOutputPath(job, new Path(output));
do{
job.waitForCompletion(false);
}while(!job.isComplete());
}
}
//对最后的数据输出格式做一遍规范。
public class Step5 {
public static class Step5_PartialMultiplyMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
// System.out.println("run");
// System.out.println("key--->" + key);
String[] tokens = Recommend.DELIMITER.split(values.toString());
Text k = new Text(tokens[1]);
Text v;
if(Double.parseDouble(tokens[2]) == 0){
v = new Text(tokens[0] + "::");
}
else{
v = new Text(tokens[0] + "::" + tokens[2]);
}
context.write(k, v);
}
}
public static class Step5_AggregateReducer extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text line : values) {
Text k = new Text(key.toString());
context.write(k, line);
}
}
}
public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
JobConf conf = Recommend.config();
String input = path.get("Step5Input");
String output = path.get("Step5Output");
HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
hdfs.rmr(output);
Job job = new Job(conf);
job.setJarByClass(Step5.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Step5.Step5_PartialMultiplyMapper.class);
job.setReducerClass(Step5.Step5_AggregateReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(ParseTextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
do{
job.waitForCompletion(false);
}while(!job.isComplete());
System.out.println("---------------------end--------------------");
}
}
关于hadoop中如何利用mapreduce实现wordcount和电影评分预测就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。