This project has retired. For details please refer to its Attic page.
PageRankWithKryoSimpleWritable xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.examples;
20  
21  import com.google.common.collect.Lists;
22  import org.apache.giraph.aggregators.BasicAggregator;
23  import org.apache.giraph.aggregators.LongSumAggregator;
24  import org.apache.giraph.edge.Edge;
25  import org.apache.giraph.edge.EdgeFactory;
26  import org.apache.giraph.graph.BasicComputation;
27  import org.apache.giraph.graph.Vertex;
28  import org.apache.giraph.io.VertexReader;
29  import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
30  import org.apache.giraph.master.DefaultMasterCompute;
31  import org.apache.giraph.worker.WorkerContext;
32  import org.apache.giraph.writable.kryo.KryoSimpleWritable;
33  import org.apache.hadoop.io.LongWritable;
34  import org.apache.hadoop.mapreduce.InputSplit;
35  import org.apache.hadoop.mapreduce.TaskAttemptContext;
36  import org.apache.log4j.Logger;
37  
38  import java.io.IOException;
39  import java.util.ArrayList;
40  import java.util.List;
41  import org.apache.giraph.examples.PageRankWithKryoSimpleWritable.VertexValue;
42  import org.apache.giraph.examples.PageRankWithKryoSimpleWritable.MessageValue;
43  import org.apache.giraph.examples.PageRankWithKryoSimpleWritable.EdgeValue;
44  
45  /**
46   * Copy of SimplePageRank, modified to test vertex/edge and
47   * message values that derives from KryoSimpleWritable.
48   */
49  @Algorithm(
50          name = "Page rank"
51  )
52  public class PageRankWithKryoSimpleWritable extends
53          BasicComputation<LongWritable, VertexValue,
54          EdgeValue, MessageValue> {
55    /** Number of supersteps for this test */
56    public static final int MAX_SUPERSTEPS = 30;
57    /** Number of supersteps for this static  3;
58    /** Logger */
59    private static final Logger LOG =
60            Logger.getLogger(PageRankWithKryoSimpleWritable.class);
61    /** Sum aggregator name */
62    private static String SUM_AGG = "sum";
63    /** Min aggregator name */
64    private static String MIN_AGG = "min";
65    /** Max aggregator name */
66    private static String MAX_AGG = "max";
67  
68    @Override
69    public void compute(
70            Vertex<LongWritable, VertexValue,
71            EdgeValue> vertex,
72            Iterable<MessageValue> messages) throws IOException {
73      if (getSuperstep() >= 1) {
74        double sum = 0;
75        for (MessageValue message : messages) {
76          sum += message.get();
77        }
78        Double value = (0.15f / getTotalNumVertices()) + 0.85f * sum;
79        VertexValue vertexValue = new VertexValue(value);
80        vertex.setValue(vertexValue);
81        aggregate(MAX_AGG, vertexValue);
82        aggregate(MIN_AGG, vertexValue);
83        aggregate(SUM_AGG, new LongWritable(1));
84        LOG.info(vertex.getId() + ": PageRank=" + vertexValue +
85                " max=" + getAggregatedValue(MAX_AGG) +
86                " min=" + getAggregatedValue(MIN_AGG));
87      }
88  
89      if (getSuperstep() < MAX_SUPERSTEPS) {
90        long edges = vertex.getNumEdges();
91        sendMessageToAllEdges(vertex,
92            new MessageValue(vertex.getValue().get() / edges));
93      } else {
94        vertex.voteToHalt();
95      }
96    }
97  
98    /**
99     * Worker context used with {@link PageRankWithKryoSimpleWritable}.
100    */
101   public static class PageRankWithKryoWorkerContext extends
102           WorkerContext {
103     /** Final max value for verification for local jobs */
104     private static double FINAL_MAX;
105     /** Final min value for verification for local jobs */
106     private static double FINAL_MIN;
107     /** Final sum value for verification for local jobs */
108     private static long FINAL_SUM;
109 
110     public static double getFinalMax() {
111       return FINAL_MAX;
112     }
113 
114     public static double getFinalMin() {
115       return FINAL_MIN;
116     }
117 
118     public static long getFinalSum() {
119       return FINAL_SUM;
120     }
121 
122     @Override
123     public void preApplication()
124             throws InstantiationException, IllegalAccessException {
125     }
126 
127     @Override
128     public void postApplication() {
129       FINAL_SUM = this.<LongWritable>getAggregatedValue(SUM_AGG).get();
130       FINAL_MAX = this.<VertexValue>getAggregatedValue(MAX_AGG).get();
131       FINAL_MIN = this.<VertexValue>getAggregatedValue(MIN_AGG).get();
132 
133       LOG.info("aggregatedNumVertices=" + FINAL_SUM);
134       LOG.info("aggregatedMaxPageRank=" + FINAL_MAX);
135       LOG.info("aggregatedMinPageRank=" + FINAL_MIN);
136     }
137 
138     @Override
139     public void preSuperstep() {
140       if (getSuperstep() >= 3) {
141         LOG.info("aggregatedNumVertices=" +
142                 getAggregatedValue(SUM_AGG) +
143                 " NumVertices=" + getTotalNumVertices());
144         if (this.<LongWritable>getAggregatedValue(SUM_AGG).get() !=
145                 getTotalNumVertices()) {
146           throw new RuntimeException("wrong value of SumAggreg: " +
147                   getAggregatedValue(SUM_AGG) + ", should be: " +
148                   getTotalNumVertices());
149         }
150         VertexValue maxPagerank = getAggregatedValue(MAX_AGG);
151         LOG.info("aggregatedMaxPageRank=" + maxPagerank.get());
152         VertexValue minPagerank = getAggregatedValue(MIN_AGG);
153         LOG.info("aggregatedMinPageRank=" + minPagerank.get());
154       }
155     }
156 
157     @Override
158     public void postSuperstep() { }
159   }
160 
161   /**
162    * Master compute associated with {@link PageRankWithKryoSimpleWritable}.
163    * It registers required aggregators.
164    */
165   public static class PageRankWithKryoMasterCompute extends
166           DefaultMasterCompute {
167     @Override
168     public void initialize() throws InstantiationException,
169             IllegalAccessException {
170       registerAggregator(SUM_AGG, LongSumAggregator.class);
171       registerPersistentAggregator(MIN_AGG, DoubleMinWrapperAggregator.class);
172       registerPersistentAggregator(MAX_AGG, DoubleMaxWrapperAggregator.class);
173     }
174   }
175 
176   /**
177    * Simple VertexReader that supports {@link PageRankWithKryoSimpleWritable}
178    */
179   public static class PageRankWithKryoVertexReader extends
180           GeneratedVertexReader<LongWritable, VertexValue, EdgeValue> {
181     /** Class logger */
182     private static final Logger LOG =
183         Logger.getLogger(
184           PageRankWithKryoSimpleWritable.PageRankWithKryoVertexReader.class);
185 
186     @Override
187     public boolean nextVertex() {
188       return totalRecords > recordsRead;
189     }
190 
191     @Override
192     public Vertex<LongWritable, VertexValue, EdgeValue>
193     getCurrentVertex() throws IOException {
194       Vertex<LongWritable, VertexValue, EdgeValue> vertex =
195               getConf().createVertex();
196       LongWritable vertexId = new LongWritable(
197               (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
198       VertexValue vertexValue = new VertexValue(vertexId.get() * 10d);
199       long targetVertexId =
200               (vertexId.get() + 1) %
201                       (inputSplit.getNumSplits() * totalRecords);
202       float edgeValue = vertexId.get() * 100f;
203       List<Edge<LongWritable, EdgeValue>> edges = Lists.newLinkedList();
204       edges.add(EdgeFactory.create(new LongWritable(targetVertexId),
205               new EdgeValue(edgeValue)));
206       vertex.initialize(vertexId, vertexValue, edges);
207       ++recordsRead;
208       if (LOG.isInfoEnabled()) {
209         LOG.info("next: Return vertexId=" + vertex.getId().get() +
210           ", vertexValue=" + vertex.getValue() +
211           ", targetVertexId=" + targetVertexId + ", edgeValue=" + edgeValue);
212       }
213       return vertex;
214     }
215   }
216 
217   /**
218    *  VertexInputFormat that supports {@link PageRankWithKryoSimpleWritable}
219    */
220   public static class PageRankWithKryoVertexInputFormat extends
221           GeneratedVertexInputFormat<LongWritable, VertexValue, EdgeValue> {
222     @Override
223     public VertexReader<LongWritable, VertexValue,
224             EdgeValue> createVertexReader(InputSplit split,
225                                               TaskAttemptContext context)
226             throws IOException {
227       return new PageRankWithKryoVertexReader();
228     }
229   }
230 
231   /**
232    * Creating a custom vertex value class to force kryo to
233    * register with a new ID. Please note that a custom
234    * class containing a double array is not
235    * necessary for the page rank application. It is only
236    * used for testing the scenario of kryo encountering an
237    * unregistered custom class.
238    */
239   public static class VertexValue extends KryoSimpleWritable {
240     /** Storing the value in an array.
241         Double array is an unregistered type
242         hence kryo will assign a unique class id */
243     private double[] ranks;
244 
245     /** Constructor */
246     public VertexValue() {
247     }
248 
249     /**
250      * Constructor
251      * @param val Vertex value
252      */
253     public VertexValue(Double val) {
254       ranks = new double[1];
255 
256       ranks[0] = val;
257     }
258 
259     /**
260      * Get vertex value
261      * @return Vertex value
262      */
263     public Double get() {
264       return ranks[0];
265     }
266 
267     /**
268      * Set vertex value.
269      * @param val Vertex value
270      */
271     public void set(Double val) {
272       this.ranks[0] = val;
273     }
274   }
275 
276   /**
277    * Creating a custom edge value class to force kryo to
278    * register with a new ID. Please note that a custom
279    * class containing a float is not
280    * necessary for the page rank application. It is only
281    * used for testing the scenario of kryo encountering an
282    * unregistered custom class.
283    */
284   public static class EdgeValue extends KryoSimpleWritable {
285     /** Edge value */
286     private Float realValue;
287 
288     /** Constructor */
289     public EdgeValue() {
290     }
291     /**
292      * Constructor
293      * @param val Edge value
294      */
295     public EdgeValue(Float val) {
296       realValue = val;
297     }
298 
299     /**
300      * Get edge value
301      * @return Edge value
302      */
303     public Float get() {
304       return realValue;
305     }
306 
307     /**
308      * Set edge value
309      * @param val Edge value
310      */
311     public void set(Float val) {
312       this.realValue = val;
313     }
314   }
315 
316   /**
317    * Creating a custom message value class to force kryo to
318    * register with a new ID. Please note that a custom
319    * class containing a double list is not
320    * necessary for the page rank application. It is only
321    * used for testing the scenario of kryo encountering an
322    * unregistered custom class.
323    */
324   public static class MessageValue extends KryoSimpleWritable {
325     /** Storing the message in a list to test the list type */
326     private List<Double> msgValue;
327 
328     /** Constructor */
329     public MessageValue() {
330     }
331 
332     /**
333      * Constructor
334      * @param val Message value
335      */
336     public MessageValue(Double val) {
337       msgValue = new ArrayList<>();
338       msgValue.add(val);
339     }
340 
341     /**
342      * Get message value
343      * @return Message value
344      */
345     public Double get() {
346       return msgValue.get(0);
347     }
348 
349     /**
350      * Set message value
351      * @param val Message value
352      */
353     public void set(Double val) {
354       this.msgValue.set(0, val);
355     }
356   }
357 
358 
359   /**
360    * Aggregator for getting max double value
361    */
362   public static class DoubleMaxWrapperAggregator extends
363           BasicAggregator<VertexValue> {
364     @Override
365     public void aggregate(VertexValue value) {
366       getAggregatedValue().set(
367                       Math.max(getAggregatedValue().get(), value.get()));
368     }
369 
370     @Override
371     public VertexValue createInitialValue() {
372       return new VertexValue(Double.NEGATIVE_INFINITY);
373     }
374   }
375 
376   /**
377    * Aggregator for getting min double value.
378    */
379   public static class DoubleMinWrapperAggregator
380           extends BasicAggregator<VertexValue> {
381     @Override
382     public void aggregate(VertexValue value) {
383       getAggregatedValue().set(
384               Math.min(getAggregatedValue().get(), value.get()));
385     }
386 
387     @Override
388     public VertexValue createInitialValue() {
389       return  new VertexValue(Double.MAX_VALUE);
390     }
391   }
392 
393 }