HADOOP August 20, 2018

Hadoop推荐算法-基于内容的协同

Words count 22k Reading time 20 mins. Read count 0

算法思想:给用户推荐和他们之前喜欢的物品在内容上相似的其他物品

物品特征建模 Item Profile

算法步骤

1 构建Item Profile矩阵

2构建Item User 评分矩阵

3 Item User * Item Profile = User Profile

4 对Item Profile 和User Profile 求余弦相似度

代码实现

step1

mapper1

package org.hadoop.mrs.contentCF.step1;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * 将矩阵转置
 * Created by shirukai on 2017/11/22.
 */
public class Mapper1 extends Mapper<LongWritable,Text,Text,Text> {
    private Text outKey = new Text();
    private Text outValue = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] mapLine = value.toString().split("\t");
        String mapIndex = mapLine[0];
        String[] keyAndValues = mapLine[1].split(",");
        for (String keyAndValue:keyAndValues
                ) {
            String mapKey = keyAndValue.split("_")[0];
            String mapValue = keyAndValue.split("_")[1];
            outKey.set(mapKey);
            outValue.set(mapIndex+"_"+mapValue);
            context.write(outKey,outValue);
        }
    }
}

Reducer1

package org.hadoop.mrs.contentCF.step1;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 *
 * Created by shirukai on 2017/11/22.
 */
public class Reducer1 extends Reducer<Text, Text, Text, Text> {
    private Text outKey = new Text();
    private Text outValue = new Text();
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        StringBuilder sb = new StringBuilder();
        for (Text value : values){
            sb.append(value+",");
        }
        String result = null;
        if (sb.toString().endsWith(",")){
            result = sb.substring(0,sb.length()-1);
        }
        //outKey:行 outValue:列_值
        outKey.set(key);
        outValue.set(result);
        context.write(outKey,outValue);
    }
}

MR1

package org.hadoop.mrs.contentCF.step1;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;
import org.hadoop.conf.Conf;

import java.io.IOException;

/**
 *
 * Created by shirukai on 2017/11/22.
 */
public class MR1 {
    private static Logger logger = Logger.getLogger("MR3");
    //输入文件的相对路径
    private static String inPath = "/contentCF/step1/step1_input/contentCF.txt";
    //输出文件的相对路径
    private static String outPath = "/contentCF/step1/step1_output";

    public static int run(){
        try {
            Configuration conf = Conf.get();
            //创建一个job实例
            Job job = Job.getInstance(conf,"step1");

            //设置job的主类
            job.setJarByClass(MR1.class);
            //设置job的Mapper类和Reducer类
            job.setMapperClass(Mapper1.class);
            job.setReducerClass(Reducer1.class);

            //设置Mapper的输出类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            //设置Reducer的输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            //设置输入和输出路径
            FileSystem fs = FileSystem.get(conf);
            Path inputPath = new Path(inPath);
            if (fs.exists(inputPath)){
                FileInputFormat.addInputPath(job,inputPath);
            }
            Path outputPath = new Path(outPath);
            fs.delete(outputPath,true);

            FileOutputFormat.setOutputPath(job,outputPath);
            return job.waitForCompletion(true)?1:-1;
        }catch (IOException e){
            logger.error(e.getMessage());
        }catch (ClassNotFoundException e){
            logger.error(e.getMessage());
        }catch (InterruptedException e){
            logger.error(e.getMessage());
        }
        return -1;
    }
}

Step2

mapper2

package org.hadoop.mrs.contentCF.step2;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;

/**
 *
 * Created by shirukai on 2017/11/13.
 */
public class Mapper2 extends Mapper<LongWritable,Text,Text,Text> {
    private static Logger logger = Logger.getLogger("Mapper3");
    private Text outKey = new Text();
    private Text outValue = new Text();
    private List<String> cacheList = new ArrayList<String>();
    private DecimalFormat df = new DecimalFormat("0.00");

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        super.setup(context);
        //通过输入流将全局缓存中的右侧矩阵读入List<String>
        FileReader fr = new FileReader("cache2");
        BufferedReader br = new BufferedReader(fr);
        //每一行的格式是:  A    6_5,4_3,2_10,1_2
        String line = null;
        while ((line = br.readLine()) != null) {
            cacheList.add(line);
        }
        fr.close();
        br.close();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] mapLine = value.toString().split("\t");
        String mapIndex = mapLine[0];
        String[] mapKeyAndValues = mapLine[1].split(",");

        //遍历缓存
        for (String cache : cacheList
                ) {

            Double result = 0D;
            String[] cacheLine = cache.split("\t");
            String cacheIndex = cacheLine[0];
            String[] cacheKeyAndValues = cacheLine[1].split(",");

            //遍历mapKeyAndValues
            for (String mapKeyAndValue : mapKeyAndValues
                    ) {
                String mapKey = mapKeyAndValue.split("_")[0];
                String mapSimilarity = mapKeyAndValue.split("_")[1];
                //遍历cacheKeyAndValues
                for (String cacheKeyAndValue : cacheKeyAndValues
                        ) {
                    String cacheKey = cacheKeyAndValue.split("_")[0];
                    String cacheSimilarity = cacheKeyAndValue.split("_")[1];
                    if (mapKey.equals(cacheKey)) {
                        result += Double.valueOf(mapSimilarity) * Double.valueOf(cacheSimilarity);
                    }
                }
            }
            if (result == 0) {
                continue;
            }
            outKey.set(mapIndex);
            outValue.set(cacheIndex + "_" + df.format(result));
            context.write(outKey, outValue);
        }
    }
}

Reducer2

package org.hadoop.mrs.contentCF.step2;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * Created by shirukai on 2017/11/13.
 */
public class Reducer2 extends Reducer<Text, Text, Text, Text> {
    private Text outKey = new Text();
    private Text outValue = new Text();

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        StringBuilder sb = new StringBuilder();
        for (Text value : values) {
            sb.append(value + ",");
        }
        String result = null;
        if (sb.toString().endsWith(",")) {
            result = sb.substring(0, sb.length() - 1);
        }
        outKey.set(key);
        outValue.set(result);
        context.write(outKey, outValue);
    }
}

MR2

package org.hadoop.mrs.contentCF.step2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;
import org.hadoop.conf.Conf;

import java.net.URI;

/**
 * Created by shirukai on 2017/11/13.
 */
public class MR2 {
    private static Logger logger = Logger.getLogger("MR3");
    //输入文件的相对路径
    private static String inPath = "/contentCF/step2/step2_input/contentUserCF.txt";
    //输出文件的相对路径
    private static String outPath = "/contentCF/step2/step2_output";

    //将step3输出的转置矩阵作为全局缓存
    private static String cache = "/contentCF/step1/step1_output/part-r-00000";

    public static int run(){
        try {
            //创建job配置类
            Configuration conf = Conf.get();
            //创建一个job实例
            Job job = Job.getInstance(conf,"step2");

            //添加分布式缓存文件
            job.addCacheArchive(new URI(cache+"#cache2"));
            //设置job的主类
            job.setJarByClass(MR2.class);
            //设置job的Mapper类和Reducer类
            job.setMapperClass(Mapper2.class);
            job.setReducerClass(Reducer2.class);

            //设置Mapper的输出类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            //设置Reducer的输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            //设置输入和输出路径
            FileSystem fs = FileSystem.get(conf);
            Path inputPath = new Path(inPath);
            if (fs.exists(inputPath)){
                FileInputFormat.addInputPath(job,inputPath);
            }
            Path outputPath = new Path(outPath);
            fs.delete(outputPath,true);

            FileOutputFormat.setOutputPath(job,outputPath);
            return job.waitForCompletion(true)?1:-1;
        }catch (Exception e){
            logger.error(e.getMessage());
        }
        return -1;

    }
}

Step3

mapper3

package org.hadoop.mrs.contentCF.step3;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;

/**
 * 根据用户、物品的评分矩阵计算物品与物品的相似度矩阵 评分矩阵*评分矩阵
 * Created by shirukai on 2017/11/12.
 */
public class Mapper3 extends Mapper<LongWritable, Text, Text, Text> {
    private static Logger logger = Logger.getLogger("Mapper3");
    private Text outKey = new Text();
    private Text outValue = new Text();
    private List<String> cacheList = new ArrayList<String>();
    private DecimalFormat df = new DecimalFormat("0.00");

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        super.setup(context);
        //通过输入流将全局缓存中的右侧矩阵读入List<String>
        FileReader fr = new FileReader("cache3");
        BufferedReader br = new BufferedReader(fr);
        String line = null;
        while ((line = br.readLine()) != null) {
            cacheList.add(line);
        }
        fr.close();
        br.close();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] mapLine = value.toString().split("\t");
        String mapIndex = mapLine[0];
        String[] mapKeyAndValues = mapLine[1].split(",");

        //遍历缓存
        for (String cache : cacheList
                ) {

            Double num = 0D;//分子
            Double cacheDen = 0D;
            Double mapDen = 0D;
            Double den;//分母
            Double cos = 0D;//最后结果

            String[] cacheLine = cache.split("\t");
            String cacheIndex = cacheLine[0];
            String[] cacheKeyAndValues = cacheLine[1].split(",");
            for (String cacheKeyAndValue : cacheKeyAndValues
                    ) {
                String cacheScore = cacheKeyAndValue.split("_")[1];
                cacheDen += Double.valueOf(cacheScore) * Double.valueOf(cacheScore);

            }
            //遍历mapKeyAndValues
            for (String mapKeyAndValue : mapKeyAndValues
                    ) {
                String mapUserId = mapKeyAndValue.split("_")[0];
                String mapScore = mapKeyAndValue.split("_")[1];
                //遍历cacheKeyAndValues
                for (String cacheKeyAndValue : cacheKeyAndValues
                        ) {
                    String cacheUserId = cacheKeyAndValue.split("_")[0];
                    String cacheScore = cacheKeyAndValue.split("_")[1];
                    if (mapUserId.equals(cacheUserId)) {
                        //分子
                        num += Double.valueOf(mapScore) * Double.valueOf(cacheScore);
                    }
                }

                mapDen += Double.valueOf(mapScore) * Double.valueOf(mapScore);
            }
            den = Math.sqrt(mapDen) * Math.sqrt(cacheDen);
            cos = num / den;
            if (num == 0) {
                continue;
            }
            outKey.set(cacheIndex);
            outValue.set(mapIndex + "_" + df.format(cos));
            context.write(outKey, outValue);
        }
    }
}

Reducer3

package org.hadoop.mrs.contentCF.step3;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * Created by shirukai on 2017/11/12.
 */
public class Reducer3 extends Reducer<Text, Text, Text, Text> {
    private Text outKey = new Text();
    private Text outValue = new Text();

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        StringBuilder sb = new StringBuilder();
        for (Text value : values) {
            sb.append(value + ",");
        }
        String result = null;
        if (sb.toString().endsWith(",")) {
            result = sb.substring(0, sb.length() - 1);
        }
        outKey.set(key);
        outValue.set(result);
        context.write(outKey, outValue);
    }
}

MR3

package org.hadoop.mrs.contentCF.step3;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;
import org.hadoop.conf.Conf;

import java.net.URI;

/**
 *
 * Created by shirukai on 2017/11/12.
 */
public class MR3 {
    private static Logger logger = Logger.getLogger("MR3");
    //输入文件的相对路径
    private static String inPath = "/contentCF/step1/step1_input/contentCF.txt";
    //输出文件的相对路径
    private static String outPath = "/contentCF/step3/step3_output";

    //将step1输出的转置矩阵作为全局缓存
    private static String cache = "/contentCF/step2/step2_output/part-r-00000";

    public static int run(){
        try {
            //创建job配置类
            Configuration conf = Conf.get();
            //创建一个job实例
            Job job = Job.getInstance(conf,"step3");

            //添加分布式缓存文件
            job.addCacheArchive(new URI(cache+"#cache3"));
            //设置job的主类
            job.setJarByClass(org.hadoop.mrs.matrix.step2.MR2.class);
            //设置job的Mapper类和Reducer类
            job.setMapperClass(Mapper3.class);
            job.setReducerClass(Reducer3.class);

            //设置Mapper的输出类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            //设置Reducer的输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            //设置输入和输出路径
            FileSystem fs = FileSystem.get(conf);
            Path inputPath = new Path(inPath);
            if (fs.exists(inputPath)){
                FileInputFormat.addInputPath(job,inputPath);
            }
            Path outputPath = new Path(outPath);
            fs.delete(outputPath,true);

            FileOutputFormat.setOutputPath(job,outputPath);
            return job.waitForCompletion(true)?1:-1;
        }catch (Exception e){
            logger.error(e.getMessage());
        }
        return -1;

    }
}

RunContentCF

package org.hadoop.mrs.contentCF;

import org.hadoop.files.Files;
import org.hadoop.mrs.contentCF.step1.MR1;
import org.hadoop.mrs.contentCF.step2.MR2;
import org.hadoop.mrs.contentCF.step3.MR3;

/**
 * 运行contentCF
 * Created by shirukai on 2017/11/22.
 */
public class RunContentCF {
    private static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger("RunUserCF");
    public static void Run() {
        //开始时间
        long startTime = System.currentTimeMillis();
        //创建目录
        try {
            Files.deleteFile("/contentCF");
            Files.mkdirFolder("/contentCF");
            for (int i = 1; i < 3; i++) {
                Files.mkdirFolder("/contentCF/step" + i);
                Files.mkdirFolder("/contentCF/step" + i + "/step" + i + "_input");
                Files.mkdirFolder("/contentCF/step" + i + "/step" + i + "_output");
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        //上传文件
        Files.uploadFile("D:\\Hadoop\\upload\\", "contentCF.txt", "/contentCF/step1/step1_input/");
        Files.uploadFile("D:\\Hadoop\\upload\\", "contentUserCF.txt", "/contentCF/step2/step2_input/");
        //执行第一步
        int step1 = MR1.run();
        if (step1 == 1) {
            //执行成功后打印文件
            Files.readOutFile("/contentCF/step1/step1_output/part-r-00000");
        }
        //执行第二步
        int step2 = MR2.run();
        if (step2 == 1) {
            //执行成功后打印文件
            Files.readOutFile("/contentCF/step2/step2_output/part-r-00000");
        }
        //执行第三步
        int step3 = MR3.run();
        if (step3 == 1) {
            //执行成功后打印文件
            Files.readOutFile("/contentCF/step3/step3_output/part-r-00000");
        }
        //结束时间
        long endTime = System.currentTimeMillis();
        logger.info("任务用时:"+(endTime-startTime)/1000+"秒");
    }
}

运行结果:

0%