This project has retired. For details please refer to its Attic page.
SimplePageRankComputation xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.examples;
20  
21  import org.apache.giraph.aggregators.DoubleMaxAggregator;
22  import org.apache.giraph.aggregators.DoubleMinAggregator;
23  import org.apache.giraph.aggregators.LongSumAggregator;
24  import org.apache.giraph.edge.Edge;
25  import org.apache.giraph.edge.EdgeFactory;
26  import org.apache.giraph.graph.BasicComputation;
27  import org.apache.giraph.graph.Vertex;
28  import org.apache.giraph.io.VertexReader;
29  import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
30  import org.apache.giraph.io.formats.TextVertexOutputFormat;
31  import org.apache.giraph.master.DefaultMasterCompute;
32  import org.apache.giraph.worker.WorkerContext;
33  import org.apache.hadoop.io.DoubleWritable;
34  import org.apache.hadoop.io.FloatWritable;
35  import org.apache.hadoop.io.LongWritable;
36  import org.apache.hadoop.io.Text;
37  import org.apache.hadoop.mapreduce.InputSplit;
38  import org.apache.hadoop.mapreduce.TaskAttemptContext;
39  import org.apache.log4j.Logger;
40  
41  import com.google.common.collect.Lists;
42  
43  import java.io.IOException;
44  import java.util.List;
45  
46  /**
47   * Demonstrates the basic Pregel PageRank implementation.
48   */
49  @Algorithm(
50      name = "Page rank"
51  )
52  public class SimplePageRankComputation extends BasicComputation<LongWritable,
53      DoubleWritable, FloatWritable, DoubleWritable> {
54    /** Number of supersteps for this test */
55    public static final int MAX_SUPERSTEPS = 30;
56    /** Logger */
57    private static final Logger LOG =
58        Logger.getLogger(SimplePageRankComputation.class);
59    /** Sum aggregator name */
60    private static String SUM_AGG = "sum";
61    /** Min aggregator name */
62    private static String MIN_AGG = "min";
63    /** Max aggregator name */
64    private static String MAX_AGG = "max";
65  
66    @Override
67    public void compute(
68        Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
69        Iterable<DoubleWritable> messages) throws IOException {
70      if (getSuperstep() >= 1) {
71        double sum = 0;
72        for (DoubleWritable message : messages) {
73          sum += message.get();
74        }
75        DoubleWritable vertexValue =
76            new DoubleWritable((0.15f / getTotalNumVertices()) + 0.85f * sum);
77        vertex.setValue(vertexValue);
78        aggregate(MAX_AGG, vertexValue);
79        aggregate(MIN_AGG, vertexValue);
80        aggregate(SUM_AGG, new LongWritable(1));
81        LOG.info(vertex.getId() + ": PageRank=" + vertexValue +
82            " max=" + getAggregatedValue(MAX_AGG) +
83            " min=" + getAggregatedValue(MIN_AGG));
84      }
85  
86      if (getSuperstep() < MAX_SUPERSTEPS) {
87        long edges = vertex.getNumEdges();
88        sendMessageToAllEdges(vertex,
89            new DoubleWritable(vertex.getValue().get() / edges));
90      } else {
91        vertex.voteToHalt();
92      }
93    }
94  
95    /**
96     * Worker context used with {@link SimplePageRankComputation}.
97     */
98    public static class SimplePageRankWorkerContext extends
99        WorkerContext {
100     /** Final max value for verification for local jobs */
101     private static double FINAL_MAX;
102     /** Final min value for verification for local jobs */
103     private static double FINAL_MIN;
104     /** Final sum value for verification for local jobs */
105     private static long FINAL_SUM;
106 
107     public static double getFinalMax() {
108       return FINAL_MAX;
109     }
110 
111     public static double getFinalMin() {
112       return FINAL_MIN;
113     }
114 
115     public static long getFinalSum() {
116       return FINAL_SUM;
117     }
118 
119     @Override
120     public void preApplication()
121       throws InstantiationException, IllegalAccessException {
122     }
123 
124     @Override
125     public void postApplication() {
126       FINAL_SUM = this.<LongWritable>getAggregatedValue(SUM_AGG).get();
127       FINAL_MAX = this.<DoubleWritable>getAggregatedValue(MAX_AGG).get();
128       FINAL_MIN = this.<DoubleWritable>getAggregatedValue(MIN_AGG).get();
129 
130       LOG.info("aggregatedNumVertices=" + FINAL_SUM);
131       LOG.info("aggregatedMaxPageRank=" + FINAL_MAX);
132       LOG.info("aggregatedMinPageRank=" + FINAL_MIN);
133     }
134 
135     @Override
136     public void preSuperstep() {
137       if (getSuperstep() >= 3) {
138         LOG.info("aggregatedNumVertices=" +
139             getAggregatedValue(SUM_AGG) +
140             " NumVertices=" + getTotalNumVertices());
141         if (this.<LongWritable>getAggregatedValue(SUM_AGG).get() !=
142             getTotalNumVertices()) {
143           throw new RuntimeException("wrong value of SumAggreg: " +
144               getAggregatedValue(SUM_AGG) + ", should be: " +
145               getTotalNumVertices());
146         }
147         DoubleWritable maxPagerank = getAggregatedValue(MAX_AGG);
148         LOG.info("aggregatedMaxPageRank=" + maxPagerank.get());
149         DoubleWritable minPagerank = getAggregatedValue(MIN_AGG);
150         LOG.info("aggregatedMinPageRank=" + minPagerank.get());
151       }
152     }
153 
154     @Override
155     public void postSuperstep() { }
156   }
157 
158   /**
159    * Master compute associated with {@link SimplePageRankComputation}.
160    * It registers required aggregators.
161    */
162   public static class SimplePageRankMasterCompute extends
163       DefaultMasterCompute {
164     @Override
165     public void initialize() throws InstantiationException,
166         IllegalAccessException {
167       registerAggregator(SUM_AGG, LongSumAggregator.class);
168       registerPersistentAggregator(MIN_AGG, DoubleMinAggregator.class);
169       registerPersistentAggregator(MAX_AGG, DoubleMaxAggregator.class);
170     }
171   }
172 
173   /**
174    * Simple VertexReader that supports {@link SimplePageRankComputation}
175    */
176   public static class SimplePageRankVertexReader extends
177       GeneratedVertexReader<LongWritable, DoubleWritable, FloatWritable> {
178     /** Class logger */
179     private static final Logger LOG =
180         Logger.getLogger(SimplePageRankVertexReader.class);
181 
182     @Override
183     public boolean nextVertex() {
184       return totalRecords > recordsRead;
185     }
186 
187     @Override
188     public Vertex<LongWritable, DoubleWritable, FloatWritable>
189     getCurrentVertex() throws IOException {
190       Vertex<LongWritable, DoubleWritable, FloatWritable> vertex =
191           getConf().createVertex();
192       LongWritable vertexId = new LongWritable(
193           (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
194       DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d);
195       long targetVertexId =
196           (vertexId.get() + 1) %
197           (inputSplit.getNumSplits() * totalRecords);
198       float edgeValue = vertexId.get() * 100f;
199       List<Edge<LongWritable, FloatWritable>> edges = Lists.newLinkedList();
200       edges.add(EdgeFactory.create(new LongWritable(targetVertexId),
201           new FloatWritable(edgeValue)));
202       vertex.initialize(vertexId, vertexValue, edges);
203       ++recordsRead;
204       if (LOG.isInfoEnabled()) {
205         LOG.info("next: Return vertexId=" + vertex.getId().get() +
206             ", vertexValue=" + vertex.getValue() +
207             ", targetVertexId=" + targetVertexId + ", edgeValue=" + edgeValue);
208       }
209       return vertex;
210     }
211   }
212 
213   /**
214    * Simple VertexInputFormat that supports {@link SimplePageRankComputation}
215    */
216   public static class SimplePageRankVertexInputFormat extends
217     GeneratedVertexInputFormat<LongWritable, DoubleWritable, FloatWritable> {
218     @Override
219     public VertexReader<LongWritable, DoubleWritable,
220     FloatWritable> createVertexReader(InputSplit split,
221       TaskAttemptContext context)
222       throws IOException {
223       return new SimplePageRankVertexReader();
224     }
225   }
226 
227   /**
228    * Simple VertexOutputFormat that supports {@link SimplePageRankComputation}
229    */
230   public static class SimplePageRankVertexOutputFormat extends
231       TextVertexOutputFormat<LongWritable, DoubleWritable, FloatWritable> {
232     @Override
233     public TextVertexWriter createVertexWriter(TaskAttemptContext context)
234       throws IOException, InterruptedException {
235       return new SimplePageRankVertexWriter();
236     }
237 
238     /**
239      * Simple VertexWriter that supports {@link SimplePageRankComputation}
240      */
241     public class SimplePageRankVertexWriter extends TextVertexWriter {
242       @Override
243       public void writeVertex(
244           Vertex<LongWritable, DoubleWritable, FloatWritable> vertex)
245         throws IOException, InterruptedException {
246         getRecordWriter().write(
247             new Text(vertex.getId().toString()),
248             new Text(vertex.getValue().toString()));
249       }
250     }
251   }
252 }