算法思想:给用户推荐和他兴趣相似的其他用户喜欢的物品
理论
现有如下用户、商品、行为、权重
行为列表
算法步骤
1.根据用户行为列表计算物品、用户的评分矩阵
2.根据评分矩阵计算用户与用户的相似矩阵
用户与用户之间的相似度矩阵
3.相似度矩阵 X 评分矩阵 = 推荐列表
得到的推荐列表如下
4.根据评分矩阵对推荐列表进行过滤
代码实现
Step1 根据用户行为列表构建评分矩阵
Mapper1
package org.hadoop.mrs.userCF.step1;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
*
* Created by shirukai on 2017/11/12.
*/
public class Mapper1 extends Mapper<LongWritable, Text, Text, Text> {
//定义输出key
private Text outKey = new Text();
//定义输出value
private Text outValue = new Text();
/**
*
* @param key 行号 1 2 3
* @param value 输入文件文本的值 A,1,1(用户ID,物品ID,分值)
* @param context context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] values = value.toString().split(",");
String userID = values[0];
String itemID = values[1];
String score = values[2];
outKey.set(userID);
outValue.set(itemID+"_"+score);
context.write(outKey,outValue);
}
}
Reducer1
package org.hadoop.mrs.userCF.step1;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* Reducer1
* Created by shirukai on 2017/11/12.
*/
public class Reducer1 extends Reducer<Text, Text, Text, Text> {
private Text outKey = new Text();
private Text outValue = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
String userID = key.toString();
//<userID,sore>
Map<String, Integer> map = new HashMap<String, Integer>();
for (Text value :values){
String itemId = value.toString().split("_")[0];
String score = value.toString().split("_")[1];
if (map.get(itemId) == null){
map.put(itemId,Integer.valueOf(score));
}else {
Integer preScore = map.get(itemId);
map.put(itemId,preScore+Integer.valueOf(score));
}
}
StringBuilder sBuilder = new StringBuilder();
for (Map.Entry<String,Integer> entry: map.entrySet()
) {
String itemID = entry.getKey();
String score = String.valueOf(entry.getValue());
sBuilder.append(itemID+"_"+score+",");
}
String line = null;
//去掉末尾的“,”
if (sBuilder.toString().endsWith(",")){
line = sBuilder.substring(0,sBuilder.length()-1);
}
outKey.set(userID);
outValue.set(line);
context.write(outKey,outValue);
}
}
MR1
package org.hadoop.mrs.userCF.step1;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;
import org.hadoop.conf.Conf;
import java.io.IOException;
/**
*
* Created by shirukai on 2017/11/12.
*/
public class MR1 {
private static Logger logger = Logger.getLogger("MR1");
//输入文件的相对路径
private static String inPath = "/userCF/step1/step1_input/userCF.txt";
//输出文件的相对路径
private static String outPath = "/userCF/step1/step1_output";
public static int run(){
try {
Configuration conf = Conf.get();
//创建一个job实例
Job job = Job.getInstance(conf,"step1");
//设置job的主类
job.setJarByClass(MR1.class);
//设置job的Mapper类和Reducer类
job.setMapperClass(Mapper1.class);
job.setReducerClass(Reducer1.class);
//设置Mapper的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//设置Reducer的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//设置输入和输出路径
FileSystem fs = FileSystem.get(conf);
Path inputPath = new Path(inPath);
if (fs.exists(inputPath)){
FileInputFormat.addInputPath(job,inputPath);
}
Path outputPath = new Path(outPath);
fs.delete(outputPath,true);
FileOutputFormat.setOutputPath(job,outputPath);
return job.waitForCompletion(true)?1:-1;
}catch (IOException e){
logger.error(e.getMessage());
}catch (ClassNotFoundException e){
logger.error(e.getMessage());
}catch (InterruptedException e){
logger.error(e.getMessage());
}
return -1;
}
}
执行step之后得到的结果
A 1_1,3_5,4_3
B 2_3,5_3
C 1_5,6_10
D 1_10,5_5
E 3_5,4_1
F 2_5,3_3,6_1
Step2 利用评分矩阵,构建用户与用户的相似度矩阵
Mapper2
package org.hadoop.mrs.userCF.step2;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
/**
* 根据用户、物品的评分矩阵计算物品与物品的相似度矩阵 评分矩阵*评分矩阵
* Created by shirukai on 2017/11/12.
*/
public class Mapper2 extends Mapper<LongWritable, Text, Text, Text> {
private static Logger logger = Logger.getLogger("Mapper2");
private Text outKey = new Text();
private Text outValue = new Text();
private List<String> cacheList = new ArrayList<String>();
private DecimalFormat df = new DecimalFormat("0.00");
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
//通过输入流将全局缓存中的右侧矩阵读入List<String>
FileReader fr = new FileReader("itemUserScore1");
BufferedReader br = new BufferedReader(fr);
String line = null;
while ((line = br.readLine()) != null) {
cacheList.add(line);
}
fr.close();
br.close();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] mapLine = value.toString().split("\t");
String mapIndex = mapLine[0];
String[] mapKeyAndValues = mapLine[1].split(",");
//遍历缓存
for (String cache : cacheList
) {
int num = 0;//分子
Double cacheDen = 0D;
Double mapDen = 0D;
Double den;//分母
Double cos = 0D;//最后结果
String[] cacheLine = cache.split("\t");
String cacheIndex = cacheLine[0];
String[] cacheKeyAndValues = cacheLine[1].split(",");
for (String cacheKeyAndValue : cacheKeyAndValues
) {
String cacheScore = cacheKeyAndValue.split("_")[1];
cacheDen += Double.valueOf(cacheScore) * Double.valueOf(cacheScore);
}
//遍历mapKeyAndValues
for (String mapKeyAndValue : mapKeyAndValues
) {
String mapUserId = mapKeyAndValue.split("_")[0];
String mapScore = mapKeyAndValue.split("_")[1];
//遍历cacheKeyAndValues
for (String cacheKeyAndValue : cacheKeyAndValues
) {
String cacheUserId = cacheKeyAndValue.split("_")[0];
String cacheScore = cacheKeyAndValue.split("_")[1];
if (mapUserId.equals(cacheUserId)) {
//分子
num += Integer.valueOf(mapScore) * Integer.valueOf(cacheScore);
}
}
mapDen += Double.valueOf(mapScore) * Double.valueOf(mapScore);
}
den = Math.sqrt(mapDen) * Math.sqrt(cacheDen);
cos = num / den;
if (num == 0) {
continue;
}
outKey.set(mapIndex);
outValue.set(cacheIndex + "_" + df.format(cos));
context.write(outKey, outValue);
}
}
}
Reducer2
package org.hadoop.mrs.userCF.step2;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* Created by shirukai on 2017/11/12.
*/
public class Reducer2 extends Reducer<Text, Text, Text, Text> {
private Text outKey = new Text();
private Text outValue = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for (Text value : values) {
sb.append(value + ",");
}
String result = null;
if (sb.toString().endsWith(",")) {
result = sb.substring(0, sb.length() - 1);
}
outKey.set(key);
outValue.set(result);
context.write(outKey, outValue);
}
}
MR2
package org.hadoop.mrs.userCF.step2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;
import org.hadoop.conf.Conf;
import java.net.URI;
/**
*
* Created by shirukai on 2017/11/12.
*/
public class MR2 {
private static Logger logger = Logger.getLogger("MR2");
//输入文件的相对路径
private static String inPath = "/userCF/step1/step1_output/part-r-00000";
//输出文件的相对路径
private static String outPath = "/userCF/step2/step2_output";
//将step1输出的转置矩阵作为全局缓存
private static String cache = "/userCF/step1/step1_output/part-r-00000";
public static int run(){
try {
//创建job配置类
Configuration conf = Conf.get();
//创建一个job实例
Job job = Job.getInstance(conf,"step2");
//添加分布式缓存文件
job.addCacheArchive(new URI(cache+"#itemUserScore1"));
//设置job的主类
job.setJarByClass(org.hadoop.mrs.matrix.step2.MR2.class);
//设置job的Mapper类和Reducer类
job.setMapperClass(Mapper2.class);
job.setReducerClass(Reducer2.class);
//设置Mapper的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//设置Reducer的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//设置输入和输出路径
FileSystem fs = FileSystem.get(conf);
Path inputPath = new Path(inPath);
if (fs.exists(inputPath)){
FileInputFormat.addInputPath(job,inputPath);
}
Path outputPath = new Path(outPath);
fs.delete(outputPath,true);
FileOutputFormat.setOutputPath(job,outputPath);
return job.waitForCompletion(true)?1:-1;
}catch (Exception e){
logger.error(e.getMessage());
}
return -1;
}
}
Step2执行后得到结果:
A A_1.00,C_0.08,D_0.15,E_0.93,F_0.43
B B_1.00,D_0.32,F_0.60
C F_0.15,D_0.40,C_1.00,A_0.08
D D_1.00,C_0.40,B_0.32,A_0.15
E A_0.93,E_1.00,F_0.50
F C_0.15,E_0.50,F_1.00,A_0.43,B_0.60
Step3 将评分矩阵转置
Mapper3
package org.hadoop.mrs.userCF.step3;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 将评分列表转置
* Created by shirukai on 2017/11/13.
*/
public class Mapper3 extends Mapper<LongWritable,Text,Text,Text> {
private Text outKey = new Text();
private Text outValue = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] mapLine = value.toString().split("\t");
String mapIndex = mapLine[0];
String[] keyAndValues = mapLine[1].split(",");
for (String keyAndValue:keyAndValues
) {
String mapKey = keyAndValue.split("_")[0];
String mapValue = keyAndValue.split("_")[1];
outKey.set(mapKey);
outValue.set(mapIndex+"_"+mapValue);
context.write(outKey,outValue);
}
}
}
Reducer3
package org.hadoop.mrs.userCF.step3;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
*
* Created by shirukai on 2017/11/13.
*/
public class Reducer3 extends Reducer<Text, Text, Text, Text> {
private Text outKey = new Text();
private Text outValue = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for (Text value : values){
sb.append(value+",");
}
String result = null;
if (sb.toString().endsWith(",")){
result = sb.substring(0,sb.length()-1);
}
//outKey:行 outValue:列_值
outKey.set(key);
outValue.set(result);
context.write(outKey,outValue);
}
}
MR3
package org.hadoop.mrs.userCF.step3;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;
import org.hadoop.conf.Conf;
import java.io.IOException;
/**
*
* Created by shirukai on 2017/11/13.
*/
public class MR3 {
private static Logger logger = Logger.getLogger("MR3");
//输入文件的相对路径
private static String inPath = "/userCF/step1/step1_output/part-r-00000";
//输出文件的相对路径
private static String outPath = "/userCF/step3/step3_output";
public static int run(){
try {
Configuration conf = Conf.get();
//创建一个job实例
Job job = Job.getInstance(conf,"step3");
//设置job的主类
job.setJarByClass(MR3.class);
//设置job的Mapper类和Reducer类
job.setMapperClass(Mapper3.class);
job.setReducerClass(Reducer3.class);
//设置Mapper的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//设置Reducer的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//设置输入和输出路径
FileSystem fs = FileSystem.get(conf);
Path inputPath = new Path(inPath);
if (fs.exists(inputPath)){
FileInputFormat.addInputPath(job,inputPath);
}
Path outputPath = new Path(outPath);
fs.delete(outputPath,true);
FileOutputFormat.setOutputPath(job,outputPath);
return job.waitForCompletion(true)?1:-1;
}catch (IOException e){
logger.error(e.getMessage());
}catch (ClassNotFoundException e){
logger.error(e.getMessage());
}catch (InterruptedException e){
logger.error(e.getMessage());
}
return -1;
}
}
Step3执行成功后,得到的结果
1 D_10,A_1,C_5
2 F_5,B_3
3 A_5,F_3,E_5
4 E_1,A_3
5 D_5,B_3
6 F_1,C_10
Step4 用户与用户相似度矩阵 X 评分矩阵
Mapper4
package org.hadoop.mrs.userCF.step4;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
/**
*
* Created by shirukai on 2017/11/13.
*/
public class Mapper4 extends Mapper<LongWritable,Text,Text,Text> {
private static Logger logger = Logger.getLogger("Mapper4");
private Text outKey = new Text();
private Text outValue = new Text();
private List<String> cacheList = new ArrayList<String>();
private DecimalFormat df = new DecimalFormat("0.00");
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
//通过输入流将全局缓存中的右侧矩阵读入List<String>
FileReader fr = new FileReader("itemUserScore4");
BufferedReader br = new BufferedReader(fr);
//每一行的格式是: A 6_5,4_3,2_10,1_2
String line = null;
while ((line = br.readLine()) != null) {
cacheList.add(line);
}
fr.close();
br.close();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] mapLine = value.toString().split("\t");
String mapIndex = mapLine[0];
String[] mapKeyAndValues = mapLine[1].split(",");
//遍历缓存
for (String cache : cacheList
) {
Double result = 0D;
String[] cacheLine = cache.split("\t");
String cacheIndex = cacheLine[0];
String[] cacheKeyAndValues = cacheLine[1].split(",");
//遍历mapKeyAndValues
for (String mapKeyAndValue : mapKeyAndValues
) {
String mapKey = mapKeyAndValue.split("_")[0];
String mapSimilarity = mapKeyAndValue.split("_")[1];
//遍历cacheUserIdAndScores
for (String cacheKeyAndValue : cacheKeyAndValues
) {
String cacheKey = cacheKeyAndValue.split("_")[0];
String cacheSimilarity = cacheKeyAndValue.split("_")[1];
if (mapKey.equals(cacheKey)) {
result += Double.valueOf(mapSimilarity) * Double.valueOf(cacheSimilarity);
}
}
}
if (result == 0) {
continue;
}
outKey.set(mapIndex);
outValue.set(cacheIndex + "_" + df.format(result));
context.write(outKey, outValue);
}
}
}
Reducer4
package org.hadoop.mrs.userCF.step4;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* Created by shirukai on 2017/11/13.
*/
public class Reducer4 extends Reducer<Text, Text, Text, Text> {
private Text outKey = new Text();
private Text outValue = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for (Text value : values) {
sb.append(value + ",");
}
String result = null;
if (sb.toString().endsWith(",")) {
result = sb.substring(0, sb.length() - 1);
}
outKey.set(key);
outValue.set(result);
context.write(outKey, outValue);
}
}
MR4
package org.hadoop.mrs.userCF.step4;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;
import org.hadoop.conf.Conf;
import java.net.URI;
/**
* Created by shirukai on 2017/11/13.
*/
public class MR4 {
private static Logger logger = Logger.getLogger("MR4");
//输入文件的相对路径
private static String inPath = "/userCF/step2/step2_output/part-r-00000";
//输出文件的相对路径
private static String outPath = "/userCF/step4/step4_output";
//将step3输出的转置矩阵作为全局缓存
private static String cache = "/userCF/step3/step3_output/part-r-00000";
public static int run(){
try {
//创建job配置类
Configuration conf = Conf.get();
//创建一个job实例
Job job = Job.getInstance(conf,"step4");
//添加分布式缓存文件
job.addCacheArchive(new URI(cache+"#itemUserScore4"));
//设置job的主类
job.setJarByClass(MR4.class);
//设置job的Mapper类和Reducer类
job.setMapperClass(Mapper4.class);
job.setReducerClass(Reducer4.class);
//设置Mapper的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//设置Reducer的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//设置输入和输出路径
FileSystem fs = FileSystem.get(conf);
Path inputPath = new Path(inPath);
if (fs.exists(inputPath)){
FileInputFormat.addInputPath(job,inputPath);
}
Path outputPath = new Path(outPath);
fs.delete(outputPath,true);
FileOutputFormat.setOutputPath(job,outputPath);
return job.waitForCompletion(true)?1:-1;
}catch (Exception e){
logger.error(e.getMessage());
}
return -1;
}
}
Step4执行后的结果:
A 1_2.90,2_2.15,3_10.94,4_3.93,5_0.75,6_1.23
B 1_3.20,2_6.00,3_1.80,5_4.60,6_0.60
C 6_10.15,5_2.00,4_0.24,3_0.85,2_0.75,1_9.08
D 1_12.15,6_4.00,5_5.96,4_0.45,3_0.75,2_0.96
E 1_0.93,2_2.50,3_11.15,4_3.79,6_0.50
F 1_1.18,2_6.80,3_7.65,4_1.79,5_1.80,6_2.50
Step5 根据评分矩阵将在步骤4的输出中,用户已经用过行为的商品评分置0
Mapper5
package org.hadoop.mrs.userCF.step5;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
/**
*
* Created by shirukai on 2017/11/13.
*/
public class Mapper5 extends Mapper<LongWritable,Text,Text,Text> {
private static Logger logger = Logger.getLogger("Mapper4");
private Text outKey = new Text();
private Text outValue = new Text();
private List<String> cacheList = new ArrayList<String>();
private DecimalFormat df = new DecimalFormat("0.00");
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
//通过输入流将全局缓存中的右侧矩阵读入List<String>
FileReader fr = new FileReader("itemUserScore5");
BufferedReader br = new BufferedReader(fr);
String line = null;
while ((line = br.readLine()) != null) {
cacheList.add(line);
}
fr.close();
br.close();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] mapLine = value.toString().split("\t");
String mapIndex = mapLine[0];
String[] mapKeyAndValues = mapLine[1].split(",");
//遍历缓存
for (String cache : cacheList
) {
String[] cacheLine = cache.split("\t");
String cacheIndex = cacheLine[0];
String[] cacheKeyAndValues = cacheLine[1].split(",");
if (mapIndex.equals(cacheIndex)){
for (String mapKeyAndValue : mapKeyAndValues
) {
boolean flag = false;
String mapKey = mapKeyAndValue.split("_")[0];
String mapSimilarity = mapKeyAndValue.split("_")[1];
//遍历cacheUserIdAndScores
for (String cacheKeyAndValue : cacheKeyAndValues
) {
String cacheKey = cacheKeyAndValue.split("_")[0];
if (mapKey.equals(cacheKey)) {
flag = true;
}
}
if (!flag){
outKey.set(mapIndex);
outValue.set(mapKey+"_"+mapSimilarity);
context.write(outKey,outValue);
}
}
}
}
}
}
Reducer5
package org.hadoop.mrs.userCF.step5;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* Created by shirukai on 2017/11/13.
*/
public class Reducer5 extends Reducer<Text, Text, Text, Text> {
private Text outKey = new Text();
private Text outValue = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for (Text value : values) {
sb.append(value + ",");
}
String result = null;
if (sb.toString().endsWith(",")) {
result = sb.substring(0, sb.length() - 1);
}
outKey.set(key);
outValue.set(result);
context.write(outKey, outValue);
}
}
MR5
package org.hadoop.mrs.userCF.step5;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;
import org.hadoop.conf.Conf;
import java.net.URI;
/**
* Created by shirukai on 2017/11/13.
*/
public class MR5 {
private static Logger logger = Logger.getLogger("MR5");
//输入文件的相对路径
private static String inPath = "/userCF/step4/step4_output/part-r-00000";
//输出文件的相对路径
private static String outPath = "/userCF/step5/step5_output";
//将step1输出的转置矩阵作为全局缓存
private static String cache = "/userCF/step1/step1_output/part-r-00000";
public static int run(){
try {
//创建job配置类
Configuration conf = Conf.get();
//创建一个job实例
Job job = Job.getInstance(conf,"step5");
//添加分布式缓存文件
job.addCacheArchive(new URI(cache+"#itemUserScore5"));
//设置job的主类
job.setJarByClass(MR5.class);
//设置job的Mapper类和Reducer类
job.setMapperClass(Mapper5.class);
job.setReducerClass(Reducer5.class);
//设置Mapper的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//设置Reducer的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//设置输入和输出路径
FileSystem fs = FileSystem.get(conf);
Path inputPath = new Path(inPath);
if (fs.exists(inputPath)){
FileInputFormat.addInputPath(job,inputPath);
}
Path outputPath = new Path(outPath);
fs.delete(outputPath,true);
FileOutputFormat.setOutputPath(job,outputPath);
return job.waitForCompletion(true)?1:-1;
}catch (Exception e){
logger.error(e.getMessage());
}
return -1;
}
}
Step5执行后的结果:
A 2_2.15,5_0.75,6_1.23
B 1_3.20,3_1.80,6_0.60
C 2_0.75,3_0.85,4_0.24,5_2.00
D 2_0.96,3_0.75,4_0.45,6_4.00
E 1_0.93,2_2.50,6_0.50
F 1_1.18,4_1.79,5_1.80
在userCF包下创建RunUserCF类,用户执行各个步骤方法
RunUserCF类
package org.hadoop.mrs.userCF;
import org.apache.hadoop.fs.Path;
import org.hadoop.files.Files;
import org.hadoop.mrs.userCF.step1.MR1;
import org.hadoop.mrs.userCF.step2.MR2;
import org.hadoop.mrs.userCF.step3.MR3;
import org.hadoop.mrs.userCF.step4.MR4;
import org.hadoop.mrs.userCF.step5.MR5;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
/**
* 运行ItemCF
* Created by shirukai on 2017/11/12.
*/
public class RunUserCF {
private static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger("RunUserCF");
public static void Run() {
//开始时间
long startTime = System.currentTimeMillis();
//创建目录
try {
Files.deleteFile("/userCF");
Files.mkdirFolder("/userCF");
for (int i = 1; i < 6; i++) {
Files.mkdirFolder("/userCF/step" + i);
Files.mkdirFolder("/userCF/step" + i + "/step" + i + "_input");
Files.mkdirFolder("/userCF/step" + i + "/step" + i + "_output");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
//上传文件
Files.uploadFile("D:\\Hadoop\\upload\\", "userCF.txt", "/userCF/step1/step1_input/");
//执行第一步
int step1 = MR1.run();
if (step1 == 1) {
//执行成功后打印文件
Files.readOutFile("/userCF/step1/step1_output/part-r-00000");
}
//执行第二步
int step2 = MR2.run();
if (step2 == 1) {
//执行成功后打印文件
Files.readOutFile("/userCF/step2/step2_output/part-r-00000");
}
//执行第三步
int step3 = MR3.run();
if (step3 == 1) {
//执行成功后打印文件
Files.readOutFile("/userCF/step3/step3_output/part-r-00000");
}
//执行第四步
int step4 = MR4.run();
if (step4 == 1) {
//执行成功后打印文件
Files.readOutFile("/userCF/step4/step4_output/part-r-00000");
}
//执行第五步
int step5 = MR5.run();
if (step5 == 1) {
//执行成功后打印文件
Files. readOutFile("/userCF/step5/step5_output/part-r-00000");
}
//结束时间
long endTime = System.currentTimeMillis();
logger.info("任务用时:"+(endTime-startTime)/1000+"秒");
}
}