This project has retired. For details please refer to its Attic page.
SimpleShortestPathsComputation xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.examples;
20  
21  import org.apache.giraph.graph.BasicComputation;
22  import org.apache.giraph.conf.LongConfOption;
23  import org.apache.giraph.edge.Edge;
24  import org.apache.giraph.graph.Vertex;
25  import org.apache.hadoop.io.DoubleWritable;
26  import org.apache.hadoop.io.FloatWritable;
27  import org.apache.hadoop.io.LongWritable;
28  import org.apache.log4j.Logger;
29  
30  import java.io.IOException;
31  
32  /**
33   * Demonstrates the basic Pregel shortest paths implementation.
34   */
35  @Algorithm(
36      name = "Shortest paths",
37      description = "Finds all shortest paths from a selected vertex"
38  )
39  public class SimpleShortestPathsComputation extends BasicComputation<
40      LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
41    /** The shortest paths id */
42    public static final LongConfOption SOURCE_ID =
43        new LongConfOption("SimpleShortestPathsVertex.sourceId", 1,
44            "The shortest paths id");
45    /** Class logger */
46    private static final Logger LOG =
47        Logger.getLogger(SimpleShortestPathsComputation.class);
48  
49    /**
50     * Is this vertex the source id?
51     *
52     * @param vertex Vertex
53     * @return True if the source id
54     */
55    private boolean isSource(Vertex<LongWritable, ?, ?> vertex) {
56      return vertex.getId().get() == SOURCE_ID.get(getConf());
57    }
58  
59    @Override
60    public void compute(
61        Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
62        Iterable<DoubleWritable> messages) throws IOException {
63      if (getSuperstep() == 0) {
64        vertex.setValue(new DoubleWritable(Double.MAX_VALUE));
65      }
66      double minDist = isSource(vertex) ? 0d : Double.MAX_VALUE;
67      for (DoubleWritable message : messages) {
68        minDist = Math.min(minDist, message.get());
69      }
70      if (LOG.isDebugEnabled()) {
71        LOG.debug("Vertex " + vertex.getId() + " got minDist = " + minDist +
72            " vertex value = " + vertex.getValue());
73      }
74      if (minDist < vertex.getValue().get()) {
75        vertex.setValue(new DoubleWritable(minDist));
76        for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
77          double distance = minDist + edge.getValue().get();
78          if (LOG.isDebugEnabled()) {
79            LOG.debug("Vertex " + vertex.getId() + " sent to " +
80                edge.getTargetVertexId() + " = " + distance);
81          }
82          sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance));
83        }
84      }
85      vertex.voteToHalt();
86    }
87  }