• 热门专题

PageRank算法的MapReduce实现

作者:  发布日期:2014-12-22 21:16:18
Tag标签:算法  
  • 假设目前需要排名计算的网页只有4个:数据如下:

    baidu	10.00 google,sina,nefu
    google	10.00 baidu
    sina	10.00 google
    nefu	10.00 sina,google

    1. baidu 存在三个外链接

    2.google 存在1个外链接

    3.sina 存在1个外链接

    4.nefu. 存在2个外链接

    由数据可以看出:所有链接都指向了google,所以google的PR应该最高,而由google指向的baidu的PR值 应该也很高。

    代码如下:

    package PageRank;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
    
    public class PageRank {
    	/**
    	 * @author XD
    	 */
    	static enum PageCount{
    		Count,TotalPR
    	}
    	public static class  Map extends Mapper < LongWritable , Text , Text , Text >{
    		protected void map(LongWritable key, Text value , Context context) throws IOException, InterruptedException{
    			context.getCounter(PageCount.Count).increment(1);
    			String[] kv = value.toString().split("\t");
    			String _key = kv[0];
    			String _value = kv[1];
    			String _PRnLink[] = _value.split(" ");
    			String pr = _PRnLink[0];
    			String link = _PRnLink[1];
    			context.write(new Text(_key),new Text(link));
    			String[] site = link.split(",");
    			float score = Float.valueOf(pr)/(site.length)*1.0f;
    			for(int i=0;i<site.length;i++){
    				context.write(new Text(site[i]), new Text(String.valueOf(score)));
    			}
    		}
    	}
    	public static class Reduce extends Reducer < Text , Text , Text, Text>{
    		protected void reduce(Text key , Iterable<Text> values ,Context context) throws IOException, InterruptedException{
    			StringBuilder sb = new StringBuilder();
    			float factor  = 0.85f;	//阻尼因子
    			float pr = 0f;
    			for(Text val : values){
    				String value = val.toString();
    				int s = value.indexOf(".");
    				if(s != -1){
    					pr += Float.valueOf(value);
    				}else{
    					String site[] = value.split(",");
    					int _len = site.length;
    					for(int k=0;k<_len;k++){
    						sb.append(site[k]);
    						sb.append(",");
    					}
    				}
    			}
    			pr = ((1-factor)+(factor*(pr)));
    			context.getCounter(PageCount.TotalPR).increment((int)(pr*1000));
    			String output = pr+" "+sb.toString();
    			context.write(key, new Text(output));
    		}
    	}
    	public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
    		// TODO Auto-generated method stub
    		String input,output;
    		int threshold = 100;
    		int iteration = 0;
    		int iterationLimit = 10;
    	
    		boolean status = false;
    		
    		while(iteration < iterationLimit){
    			//展开反复迭代  注意 输入输出的路径 
    			if((iteration % 2) == 0){
    				input = "hdfs://localhost:9000/output_pr/p*";
    				output = "hdfs://localhost:9000/output_pr2";
    			}else{
    				input = "hdfs://localhost:9000/output_pr2/p*";
    				output = "hdfs://localhost:9000/output_pr";
    			}
    			Configuration conf = new Configuration();
    			final FileSystem filesystem = FileSystem.get(new URI(input),conf);
    			final Path outPath = new Path(output);
    			if(filesystem.exists(outPath)){
    				filesystem.delete(outPath, true);
    			}
    			Job job = new Job(conf,PageRank.class.getSimpleName());
    			
    			//1.1 读取文件 位置
    			FileInputFormat.setInputPaths(job, input);
    			
    			//1.2指定的map类//1.3 map输出的key value 类型 要是和最终的输出类型是一样的 可以省略
    			job.setMapperClass(Map.class);
    			job.setMapOutputKeyClass(Text.class);
    			job.setMapOutputValueClass(Text.class);
    			job.setJarByClass(PageRank.class);
    			
    			//1.3 分区
    			job.setPartitionerClass(HashPartitioner.class);
    			
    			job.setReducerClass(Reduce.class);
    			//指定 reduce的输出类型
    			job.setOutputKeyClass(Text.class);
    			job.setOutputValueClass(Text.class);
    			
    			//指定写出到什么位置
    			FileOutputFormat.setOutputPath(job, new Path(output));
    			status = job.waitForCompletion(true);
    			iteration++;
    			long count = job.getCounters().findCounter(PageCount.Count).getValue();
    			long TotalPr = job.getCounters().findCounter(PageCount.TotalPR).getValue();
    			System.out.println("PageCount:"+count);
    			System.out.println("TotalPR:"+TotalPr);
    			double per_pr = TotalPr/(count*1.0d);
    			System.out.println("PEr_er:"+per_pr);
    			if((int)per_pr == threshold){
    				System.out.println("Iteration:"+iteration);
    				break;
    			}	
    		}
            System.exit(status?0:1);
    	}
    }
    

    最后输出结果如下:


About IT165 - 广告服务 - 隐私声明 - 版权申明 - 免责条款 - 网站地图 - 网友投稿 - 联系方式
本站内容来自于互联网,仅供用于网络技术学习,学习中请遵循相关法律法规