hadoop中如何利用mapreduce实现wordcount和电影评分预测
更新:HHH   时间:2023-1-7


这篇文章将为大家详细讲解有关hadoop中如何利用mapreduce实现wordcount和电影评分预测,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

mapreduce中map指映射,map指的是归约。

mapreduce是一个key-value来处理数据的编程模型,它使用map将一组key-value映射为另一组key-value

通过底层传递给reduce,在reduce中,它将所有map过程传递过来的key-value进行归约,相同的key值,value值会放在一起。mapreduce内部还会对reduce过程中的key值进行一次排序。

一.WordCount

public class WordCount
{
    //
    public static final String HDFS = "hdfs://localhost:8888";
    public static final Pattern DELIMITER = Pattern.compile("\\b([a-zA-Z]+)\\b");
    
    //自定义Map类型执行  "映射"这一部分
    public static class Map extends Mapper<LongWritable, Text, Text, IntWritable>
    {
        //mapreduce中,Text相当于String类型,IntWritable相当于Int类型
        //LongWritable是实现了WritableComparable的一个数据类型。
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        
        @Override
        //重写父类map()函数
        public void map(LongWritable key, Text value,
                Context context)
                throws IOException, InterruptedException
        {
            //读取一行数据
            String line = value.toString();

            //将该行字符全部变为小写
            line = line.toLowerCase();
            //根据定义好的正则表达式拆分一行字符串。
            Matcher matcher = DELIMITER.matcher(line);
            while(matcher.find()){
                //将分解的一个个单词类型转化为Text。
                word.set(matcher.group());
                //将相应的key-value值传入。key值为单词,value值为1.
                context.write(word,one);
            }
        }
    }
    
    //自定义Combine过程先对本地进行的map进行一次reduce过程,减少传递给主机的数据量.
    public static class Combine extends Reducer <Text, IntWritable, Text, IntWritable>
    {
         @Override
         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            //遍历同一个key值的所有value,所有的value放在同一个Iterable中。
            for (IntWritable line : values)
            {
                sum += line.get();
            }
            IntWritable value = new IntWritable(sum);
            //将key-value按照指定的输出格式输出。
            context.write(key, value);
        }
    }
    
    public static class Reduce extends Reducer <Text, IntWritable, Text, IntWritable>
    {
        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
           for (IntWritable line : values)
           {
               sum += line.get();
           }
           IntWritable value = new IntWritable(sum);
           context.write(key, value);
            

            
       }
    }
    public static void main(String[] args) throws Exception
    {
        JobConf conf = WordCount.config();
        String input = "data/1.txt";
        String output = HDFS + "/user/hdfs/wordcount";
        //自定义HDFS文件操作工具类
        HdfsDAO hdfs = new HdfsDAO(WordCount.HDFS, conf);
        //移除存在的文件否则会报文件生成文件已存在的错误
        hdfs.rmr(output);

        Job job = new Job(conf);
        job.setJarByClass(WordCount.class);
        
        //设置输出的key值类型
        job.setOutputKeyClass(Text.class);

        //设置输出的value值类型
        job.setOutputValueClass(IntWritable.class);
        
        job.setMapperClass(WordCount.Map.class);
        job.setCombinerClass(WordCount.Combine.class);
        job.setReducerClass(WordCount.Reduce.class);
        
        job.setInputFormatClass(TextInputFormat.class);
        //设置输出的格式,这里使用的是自定义的FileOutputFormat类,见下文。
        job.setOutputFormatClass(ParseTextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

       
      
        
        
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
    
    
    public static JobConf config() {
        JobConf conf = new JobConf(WordCount.class);
        conf.setJobName("WordCount");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
//        conf.set("io.sort.mb", "");
        return conf;
    }

    


    
}

自定义文件输出格式

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;



public class ParseTextOutputFormat<K, V> extends FileOutputFormat<K, V>{
    protected static class LineRecordWriter<K, V> extends RecordWriter<K, V> {
        private static final String utf8 = "UTF-8";
        private static final byte[] newline;
        static {
          try {
            newline = "\n".getBytes(utf8);
          } catch (UnsupportedEncodingException uee) {
            throw new IllegalArgumentException("can't find " + utf8 + " encoding");
          }
        }

        protected DataOutputStream out;
        private final byte[] keyValueSeparator;

        public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
          this.out = out;
          try {
            this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
          } catch (UnsupportedEncodingException uee) {
            throw new IllegalArgumentException("can't find " + utf8 + " encoding");
          }
        }

        public LineRecordWriter(DataOutputStream out) {
          this(out, "\t");
        }

        /**
         * Write the object to the byte stream, handling Text as a special
         * case.
         * @param o the object to print
         * @throws IOException if the write throws, we pass it on
         */
        private void writeObject(Object o) throws IOException {
          if (o instanceof Text) {
            Text to = (Text) o;
            out.write(to.getBytes(), 0, to.getLength());
          } else {
            out.write(o.toString().getBytes(utf8));
          }
        }

        public synchronized void write(K key, V value)
          throws IOException {

          boolean nullKey = key == null || key instanceof NullWritable;
          boolean nullValue = value == null || value instanceof NullWritable;
          if (nullKey && nullValue) {
            return;
          }
          if (!nullKey) {
            writeObject(key);
          }
          if (!(nullKey || nullValue)) {
            out.write(keyValueSeparator);
          }
          if (!nullValue) {
            writeObject(value);
          }
          out.write(newline);
        }

        public synchronized 
        void close(TaskAttemptContext context) throws IOException {
          out.close();
        }
      }

      public RecordWriter<K, V> 
             getRecordWriter(TaskAttemptContext job
                             ) throws IOException, InterruptedException {
        Configuration conf = job.getConfiguration();
        boolean isCompressed = getCompressOutput(job);
        String keyValueSeparator= conf.get("mapred.textoutputformat.separator",
                                           ":");
        CompressionCodec codec = null;
        String extension = "";
        if (isCompressed) {
          Class<? extends CompressionCodec> codecClass = 
            getOutputCompressorClass(job, GzipCodec.class);
          codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
          extension = codec.getDefaultExtension();
        }
        Path file = getDefaultWorkFile(job, extension);
        FileSystem fs = file.getFileSystem(conf);
        if (!isCompressed) {
          FSDataOutputStream fileOut = fs.create(file, false);
          return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
        } else {
          FSDataOutputStream fileOut = fs.create(file, false);
          return new LineRecordWriter<K, V>(new DataOutputStream
                                            (codec.createOutputStream(fileOut)),
                                            keyValueSeparator);
        }
      }
        
 }

二.电影评分预测

整个算法的实现中使用了slop one算法来预测评分,此处自定义的输出类与上文一致。

输入文件格式为userId::movieId::score

package  main.java.org.conan.myhadoop.recommend;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.hadoop.mapred.JobConf;
import  main.java.org.conan.myhadoop.hdfs.HdfsDAO;

public class Recommend {

    public static final String HDFS = "hdfs://localhost:8888";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");
    
    public static final Pattern STRING = Pattern.compile("[\t,:]");
    
//    public final static int movieListLength = 100000;
//    public static int []movieList = new int[movieListLength];
    public static List movieList = new ArrayList();
    
    public static Map userScore = new HashMap();

    public static void main(String[] args) throws Exception {
        Map<String, String> path = new HashMap<String, String>();
        String in = "logfile/4.txt";
        String out = HDFS + "/user/hdfs/recommend" + "/step5";
        
//       path.put("data", "logfile/small.csv");
        
//       path.put("data", "logfile/ratings.dat");
        if(args.length == 2){
            in = args[0];
            out = HDFS + args[1];
            System.out.println(out);
        }
        //设置数据输入路径
        path.put("data", in);
        
        //设置第一步输入文件路径
        path.put("Step1Input", HDFS + "/user/hdfs/recommend");
        
        //设置第一步结果输出路径
        path.put("Step1Output", path.get("Step1Input") + "/step1");
        
        //设置第二步输入文件路径
        path.put("Step2Input", path.get("Step1Output"));
        
        //设置第二步结果输出路径
        path.put("Step2Output", path.get("Step1Input") + "/step2");
        
        //设置第三步输入文件路径
        path.put("Step3Input1", path.get("data"));
//        path.put("Step3Input2", "logfile/movie/movies.dat");
        //设置第三步结果输出路径
        path.put("Step3Output", path.get("Step1Input") + "/step3");
//        path.put("Step3Input2", path.get("Step2Output"));
//        path.put("Step3Output2", path.get("Step1Input") + "/step3_2");
//        
        //设置第四步输入文件路径1
        path.put("Step4Input1", path.get("Step2Output"));
        
        //设置第四步输入文件路径2
        path.put("Step4Input2", path.get("Step3Output"));
        //设置第四步结果输出路径
        path.put("Step4Output", path.get("Step1Input") + "/step4");
//        
        //设置第五步输入文件路径
        path.put("Step5Input", path.get("Step4Output"));
//        path.put("Step5Input2", path.get("Step3Output2"));
        //设置第五步结果输出路径
        path.put("Step5Output", out);
        
        //第一步,根据给出的用户评分文件,求出每个用户对物品的评分矩阵
        Step1.run(path);
        
        //根据第一步的输出结果计算物品评分的同现矩阵
        Step2.run(path);
        
        //获取所有用户评过分的电影,并输出每位用户对每部电影的评分,未评过则记为0
        Step3.run(path);
        
        //根据第二步与第三步的结果计算出每位用户对每部电影的评分
        Step4.run(path);
        
        //整理输出格式。
        Step5.run(path);
        
        System.exit(0);
    }

    public static JobConf config() {
        JobConf conf = new JobConf(Recommend.class);
        conf.setJobName("Recommand");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
//        conf.set("io.sort.mb", "");
        return conf;
    }

}
//求出用户对物品的评分矩阵,即得出用户对电影 的评分矩阵
//每一行数据代表一个用户对所有打分电影的结果
//key值为userId, value值为movieID:score,movieId:score


public class Step1 {

    public static class Step1_ToItemPreMapper extends MapReduceBase implements Mapper<Object, Text, Text, Text> {
        private final static Text k = new Text();
        private final static Text v = new Text();

        @Override
        public void map(Object key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            String[] tokens = value.toString().split("::");
            String itemID = tokens[1];
            String pref = tokens[2];
            k.set(tokens[0]);
            v.set(itemID + ":" + pref);
            output.collect(k, v);
        }
    }

    public static class Step1_ToUserVectorReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {

        @Override
        public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            String value= "";
            int num = 0;
            while (values.hasNext()) {
                num++;
                value += values.next();
                value += ",";
                if( num >= 400 ){
                    value = value.substring(0, value.length() - 1);
                    Text v = new Text(value);
                    output.collect(key, v);
                    value = "";
                    num = 0;
                    break;
                }

            }
            if(num != 0){
                value = value.substring(0, value.length() - 1);
                Text v = new Text(value);
                output.collect(key, v);
            }
            
        }
    }

    public static void run(Map<String, String> path) throws IOException {
        JobConf conf = Recommend.config();

        String input = path.get("Step1Input");
        String output = path.get("Step1Output");

        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
//        hdfs.rmr(output);
        hdfs.rmr(input);
        hdfs.mkdirs(input);
        hdfs.copyFile(path.get("data"), input);

        conf.setMapOutputKeyClass(Text.class);
        conf.setMapOutputValueClass(Text.class);

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);

        conf.setMapperClass(Step1_ToItemPreMapper.class);
        conf.setReducerClass(Step1_ToUserVectorReducer.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(input));
        FileOutputFormat.setOutputPath(conf, new Path(output));

        RunningJob job = JobClient.runJob(conf);
        while (!job.isComplete()) {
            job.waitForCompletion();
        }
    }

}
//根据第一步的 结果求出物品的同现矩阵
//算法方面,没有太好的算法处理两个for循环,就在求物品同现矩阵的时候使用一个随机数,得出一个movieA:movieB的结果

public class Step2 {
    public static class Step2_UserVectorToCooccurrenceMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, DoubleWritable> {
        private final static Text k = new Text();
        private final static DoubleWritable v = new DoubleWritable();
//        private final static IntWritable v = new IntWritable(1);


        @Override
        public void map(LongWritable key, Text values, OutputCollector<Text, DoubleWritable> output, Reporter reporter) throws IOException {
            String[] tokens = Recommend.DELIMITER.split(values.toString());
            for (int i = 1; i < tokens.length; i++) {
                String itemID = tokens[i].split(":")[0];
//                for (int j = 1; j < i+1; j++) {
//                    String itemID2 = tokens[j].split(":")[0];
//                    double sum =Double.parseDouble(tokens[i].split(":")[1])-Double.parseDouble(tokens[j].split(":")[1]); 
////                    if(sum<0.5) break;
////                    if(sum>4.5) break;
//                    k.set(itemID + ":" + itemID2+":");
//                    v.set(sum);
//                    output.collect(k, v);
//                    k.set(itemID2 + ":" + itemID+":");
//                    v.set(sum);
//                    output.collect(k, v);
//
//                }
                Random random = new Random();
                int j;
                j = random.nextInt(tokens.length - 1) + 1;
                String itemID2 = tokens[j].split(":")[0];
                double sum =Double.parseDouble(tokens[i].split(":")[1])-Double.parseDouble(tokens[j].split(":")[1]);
                k.set(itemID + ":" + itemID2+":");
                v.set(sum);
                output.collect(k, v);
            }
        }
    }

    public static class Step2_UserVectorToConoccurrenceReducer extends MapReduceBase implements Reducer<Text, DoubleWritable, Text, DoubleWritable> {
        private DoubleWritable result = new DoubleWritable();

        @Override
        public void reduce(Text key, Iterator<DoubleWritable> values, OutputCollector<Text, DoubleWritable> output, Reporter reporter) throws IOException {
            double sum = 0;
            int count = 0;
            while (values.hasNext()) {
                sum += values.next().get();
                count++;
            }
            sum = sum/count*1.0;
            DecimalFormat df = new DecimalFormat("#.0000");
            sum = Double.valueOf(df.format(sum));
//            System.out.println(key+"---count----"+count+"-------"+sum);
            result.set(sum);
            output.collect(key, result);
        }
    }

    public static void run(Map<String, String> path) throws IOException {
        JobConf conf = Recommend.config();

        String input = path.get("Step2Input"); 
        String output = path.get("Step2Output");

        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
        hdfs.rmr(output);

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(DoubleWritable.class);

        conf.setMapperClass(Step2_UserVectorToCooccurrenceMapper.class);
//        conf.setCombinerClass(Step2_UserVectorToConoccurrenceReducer.class);
        conf.setReducerClass(Step2_UserVectorToConoccurrenceReducer.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(input));
        FileOutputFormat.setOutputPath(conf, new Path(output));

        RunningJob job = JobClient.runJob(conf);
        while (!job.isComplete()) {
            job.waitForCompletion();
        }
    }
}
//取所有用户评过分的电影,并输出每位用户对每部电影的评分,未评过则记为0
//此处因为没有一个专门的电影记录为文件,所以就只能从数据文件中获取到所有的电影ID。
//并将所有的电影ID维持在一个线性表中,但是当数据文件过大时,每次读取一条数据都要从线性表中判断该电影是否已经记录
//,导致效率会越来越低
//并且维持一个静态map记录每个用户对的第一部评过分的电影,以此作为标准,使用物品同现矩阵进行计算
public class Step3 {

    public static class Step4_PartialMultiplyMapper extends Mapper<LongWritable, Text, Text, Text> {
        private final static Text k = new Text();
        private final static Text v = new Text();
        private String flag;    //判断读取的数据集
//        private final static Map<Integer, List<Cooccurrence>> cooccurrenceMatrix = new HashMap<Integer, List<Cooccurrence>>();

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSplit split = (FileSplit) context.getInputSplit();
            flag = split.getPath().getParent().getName();// 判断读的数据集
            
        }

        @Override
        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            String[] tokens = values.toString().split("::");
//            System.out.println(flag);
//            System.out.println(tokens.length);
//            
//            for(int i = 0;i < tokens.length;i++){
//                System.out.println(tokens[i]);
//            }
            
//            获取所有的电影数据,应该有一个文件记录所有的电影信息,就不用判断是否包含直接添加
            if( !Recommend.movieList.contains(tokens[1]) ){
                Recommend.movieList.add(tokens[1]);
            }
            
//            if(flag.equals("movie")){
//                Recommend.movieList.add(tokens[0]);
//            }
//            else{
                k.set(tokens[0]);
                v.set(tokens[1] + "," + tokens[2]);
                context.write(k, v);
//            }
            
        }
    }

    public static class Step4_AggregateAndRecommendReducer extends Reducer<Text, Text, Text, Text> {
        private final static Text v = new Text();

        @Override
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

            Map userMovieList = new HashMap();
            for(Text line : values){
                String[] tokens = Recommend.DELIMITER.split(line.toString());
                userMovieList.put(tokens[0], tokens[1]);
            }
            for(int i = 0; i < Recommend.movieList.size();i++){
//                System.out.println("key---->" + key);
//                System.out.println("value---->" + v);
                if(!userMovieList.containsKey(Recommend.movieList.get(i))){
                    v.set(Recommend.movieList.get(i) + "," + 0);
                    context.write(key, v);
                }
                else{
                    v.set(Recommend.movieList.get(i) + "," + userMovieList.get(Recommend.movieList.get(i)));
                    context.write(key, v);
                }
            }
        }
    }

    public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = Recommend.config();

        String input1 = path.get("Step3Input1");
//        String input2 = path.get("Step3Input2");

        String output = path.get("Step3Output");

        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
        hdfs.rmr(output);

        Job job = new Job(conf);
        job.setJarByClass(Step3.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(Step3.Step4_PartialMultiplyMapper.class);
        job.setReducerClass(Step3.Step4_AggregateAndRecommendReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input1));
        FileOutputFormat.setOutputPath(job, new Path(output));
        

        do{
            job.waitForCompletion(false);
        }while(!job.isComplete());
    }

}
//根据第二步与第三步的结果计算出每位用户对每部电影的评分
//根据第三步结果,读取数据,当发现用户对某部电影的评分为0的时候,
//根据第二步得到的map获取数据和物品同现矩阵计算得出用户对电影的评分
public class Step4 {

    public static class Step4Update_PartialMultiplyMapper extends Mapper<LongWritable, Text, Text, Text> {

        private String flag;// A同现矩阵 or B评分矩阵

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSplit split = (FileSplit) context.getInputSplit();
            flag = split.getPath().getParent().getName();// 判断读的数据集

//             System.out.println(flag);
        }

        @Override
        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            String[] tokens = Recommend.DELIMITER.split(values.toString());
            
            
            
            if (flag.equals("step2")) {// 同现矩阵
//                System.out.println(tokens.length);
//                for(int i = 0; i < tokens.length;i++){
//                    System.out.println(tokens[i]);
//                }
//                String[] v1 = tokens[0].split(":");
//                String itemID1 = v1[0];
//                String itemID2 = v1[1];
//                String num = tokens[1];
//
//                Text k = new Text(itemID1);
//                Text v = new Text("A:" + itemID2 + "," + num);
                String[] v1 = tokens[0].split(":");
                
                
                
                
                Text k = new Text(v1[0]);
                
                Text v = new Text("M:" + v1[1] + "," + tokens[1]);
                
                
                context.write(k, v);
//                 System.out.println(k.toString() + "  " + v.toString());

            } else if (flag.equals("step3")) {// 评分矩阵
//                System.out.println(tokens.length);
//                for(int i = 0; i < tokens.length;i++){
//                    System.out.println(tokens[i]);
//                }
                
//                String[] v2 = tokens[1].split(",");
////                String itemID = tokens[0];
////                String userID = v2[0];
////                String pref = v2[1];
                
                if(Double.parseDouble(tokens[2]) != 0 && !Recommend.userScore.containsKey(tokens[0])){
                    Recommend.userScore.put(tokens[0], tokens[1] + "," + tokens[2]);
                }
////
                Text k = new Text(tokens[1]);
                
                Text v = new Text("U:" + tokens[0] + "," + tokens[2]);

                context.write(k, v);
                // System.out.println(k.toString() + "  " + v.toString());
            }
        }

    }

    public static class Step4Update_AggregateReducer extends Reducer<Text, Text, Text, Text> {

        @Override
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//            System.out.println("key--->" + key);
            Map movie = new HashMap();
            Text k;
            Text v;
            //Map user = new HashMap();
            List list = new ArrayList();
            for (Text line : values) {
                list.add(line.toString());
//                System.out.println(line.toString());
                String[] tokens = Recommend.STRING.split(line.toString());
                if(tokens[0].equals("M")){
//                    System.out.println(tokens[1]);
//                    System.out.println(tokens[2]);
                    movie.put(tokens[1], tokens[2]);
                }
            }
            
            for(int i = 0;i < list.size();i++) {
                
                String[] tokens = Recommend.STRING.split((String) list.get(i));
                //System.out.println(tokens[0]);
                if(tokens[0].equals("U")){
                    if(Double.parseDouble(tokens[2]) == 0 ){
                        String userScore = (String) Recommend.userScore.get(tokens[1]);
                        String[] temps =  Recommend.STRING.split(userScore);
                        k = new Text(key);
//                        System.out.println("useid"+tokens[1]+"movie score"+temps[1]);
//                        System.out.println("movie id"+movie.get(temps[0]));
                        double temp = 0;
                        if(movie.get(temps[0]) != null){
                            Double.parseDouble((String) movie.get(temps[0]));
                        }
                        
                        double score = Double.parseDouble(temps[1])+temp;
                        
                        v = new Text(tokens[1] + "," + score);

                    }
                    else{
                        k = new Text(key);
                        v = new Text(tokens[1] + "," + tokens[2]);
                        
                        
                    }
//                    System.out.println("key-->" + k);
//                    System.out.println("value-->" + v);
                    context.write(k, v);
                }
                
            }
            
            
            
//            System.out.println(key.toString() + ":");
//
//            Map<String, String> mapA = new HashMap<String, String>();
//            Map<String, String> mapB = new HashMap<String, String>();
//
//            for (Text line : values) {
//                String val = line.toString();
//                System.out.println(val);
//
//                if (val.startsWith("A:")) {
//                    String[] kv = Recommend.DELIMITER.split(val.substring(2));
//                    mapA.put(kv[0], kv[1]);
//
//                } else if (val.startsWith("B:")) {
//                    String[] kv = Recommend.DELIMITER.split(val.substring(2));
//                    mapB.put(kv[0], kv[1]);
//
//                }
//            }
//
//            double result = 0;
//            Iterator<String> iter = mapA.keySet().iterator();
//            while (iter.hasNext()) {
//                String mapk = iter.next();// itemID
//
//                int num = Integer.parseInt(mapA.get(mapk));
//                Iterator<String> iterb = mapB.keySet().iterator();
//                while (iterb.hasNext()) {
//                    String mapkb = iterb.next();// userID
//                    double pref = Double.parseDouble(mapB.get(mapkb));
//                    result = num * pref;// 矩阵乘法相乘计算
//
//                    Text k = new Text(mapkb);
//                    Text v = new Text(mapk + "," + result);
//                    context.write(k, v);
//                    System.out.println(k.toString() + "  " + v.toString());
//                }
//            }
        }
    }

    public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = Recommend.config();

        String input1 = path.get("Step4Input1");
        String input2 = path.get("Step4Input2");
        String output = path.get("Step4Output");

        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
        hdfs.rmr(output);

        Job job = new Job(conf);
        job.setJarByClass(Step4.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(Step4.Step4Update_PartialMultiplyMapper.class);
        job.setReducerClass(Step4.Step4Update_AggregateReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));
        FileOutputFormat.setOutputPath(job, new Path(output));
        

        do{
            job.waitForCompletion(false);
        }while(!job.isComplete());
        
    }

}
//对最后的数据输出格式做一遍规范。
public class Step5 { 
 
    public static class Step5_PartialMultiplyMapper extends Mapper<LongWritable, Text, Text, Text> { 
 
 
 
        @Override 
        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException { 
//            System.out.println("run"); 
//            System.out.println("key--->" + key); 
            String[] tokens = Recommend.DELIMITER.split(values.toString()); 
            Text k = new Text(tokens[1]); 
            Text v; 
            if(Double.parseDouble(tokens[2]) == 0){ 
                v = new Text(tokens[0] + "::"); 
            } 
            else{ 
                v = new Text(tokens[0] + "::" + tokens[2]); 
            } 
            context.write(k, v); 
        } 
 
    } 
 
    public static class Step5_AggregateReducer extends Reducer<Text, Text, Text, Text> { 
 
        @Override 
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 
            for (Text line : values) { 
                Text k = new Text(key.toString()); 
                context.write(k, line); 
            } 
        } 
    } 
 
    public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException { 
        JobConf conf = Recommend.config(); 
 
        String input = path.get("Step5Input"); 
        String output = path.get("Step5Output"); 
 
        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf); 
        hdfs.rmr(output); 
 
        Job job = new Job(conf); 
        job.setJarByClass(Step5.class); 
 
        job.setOutputKeyClass(Text.class); 
        job.setOutputValueClass(Text.class); 
 
        job.setMapperClass(Step5.Step5_PartialMultiplyMapper.class); 
        job.setReducerClass(Step5.Step5_AggregateReducer.class); 
 
        job.setInputFormatClass(TextInputFormat.class); 
        job.setOutputFormatClass(ParseTextOutputFormat.class); 
 
        FileInputFormat.setInputPaths(job, new Path(input)); 
        FileOutputFormat.setOutputPath(job, new Path(output)); 
 
        do{ 
            job.waitForCompletion(false); 
        }while(!job.isComplete()); 
        System.out.println("---------------------end--------------------"); 
    } 
 
}

关于hadoop中如何利用mapreduce实现wordcount和电影评分预测就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

返回云计算教程...