This project has retired. For details please refer to its Attic page.
SimpleMutateGraphComputation xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.examples;
20  
21  import org.apache.giraph.graph.BasicComputation;
22  import org.apache.giraph.edge.EdgeFactory;
23  import org.apache.giraph.graph.Vertex;
24  import org.apache.giraph.worker.WorkerContext;
25  import org.apache.hadoop.io.DoubleWritable;
26  import org.apache.hadoop.io.FloatWritable;
27  import org.apache.hadoop.io.LongWritable;
28  import org.apache.log4j.Logger;
29  
30  import java.io.IOException;
31  
32  /**
33   * Vertex to allow unit testing of graph mutations.
34   */
35  public class SimpleMutateGraphComputation extends BasicComputation<
36      LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
37    /** Class logger */
38    private static Logger LOG =
39        Logger.getLogger(SimpleMutateGraphComputation.class);
40    /** Maximum number of ranges for vertex ids */
41    private long maxRanges = 100;
42  
43  
44    /**
45     * Unless we create a ridiculous number of vertices , we should not
46     * collide within a vertex range defined by this method.
47     *
48     * @param range Range index
49     * @return Starting vertex id of the range
50     */
51    private long rangeVertexIdStart(int range) {
52      return (Long.MAX_VALUE / maxRanges) * range;
53    }
54  
55    @Override
56    public void compute(
57        Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
58        Iterable<DoubleWritable> messages) throws IOException {
59      SimpleMutateGraphVertexWorkerContext workerContext = getWorkerContext();
60      if (getSuperstep() == 0) {
61        LOG.debug("Reached superstep " + getSuperstep());
62      } else if (getSuperstep() == 1) {
63        // Send messages to vertices that are sure not to exist
64        // (creating them)
65        LongWritable destVertexId =
66            new LongWritable(rangeVertexIdStart(1) + vertex.getId().get());
67        sendMessage(destVertexId, new DoubleWritable(0.0));
68      } else if (getSuperstep() == 2) {
69        LOG.debug("Reached superstep " + getSuperstep());
70      } else if (getSuperstep() == 3) {
71        long vertexCount = workerContext.getVertexCount();
72        if (vertexCount * 2 != getTotalNumVertices()) {
73          throw new IllegalStateException(
74              "Impossible to have " + getTotalNumVertices() +
75              " vertices when should have " + vertexCount * 2 +
76              " on superstep " + getSuperstep());
77        }
78        long edgeCount = workerContext.getEdgeCount();
79        if (edgeCount != getTotalNumEdges()) {
80          throw new IllegalStateException(
81              "Impossible to have " + getTotalNumEdges() +
82              " edges when should have " + edgeCount +
83              " on superstep " + getSuperstep());
84        }
85        // Create vertices that are sure not to exist (doubling vertices)
86        LongWritable vertexIndex =
87            new LongWritable(rangeVertexIdStart(3) + vertex.getId().get());
88        addVertexRequest(vertexIndex, new DoubleWritable(0.0));
89        // Add edges to those remote vertices as well
90        addEdgeRequest(vertexIndex,
91            EdgeFactory.create(vertex.getId(), new FloatWritable(0.0f)));
92      } else if (getSuperstep() == 4) {
93        LOG.debug("Reached superstep " + getSuperstep());
94      } else if (getSuperstep() == 5) {
95        long vertexCount = workerContext.getVertexCount();
96        if (vertexCount * 2 != getTotalNumVertices()) {
97          throw new IllegalStateException(
98              "Impossible to have " + getTotalNumVertices() +
99              " when should have " + vertexCount * 2 +
100             " on superstep " + getSuperstep());
101       }
102       long edgeCount = workerContext.getEdgeCount();
103       if (edgeCount + vertexCount != getTotalNumEdges()) {
104         throw new IllegalStateException(
105             "Impossible to have " + getTotalNumEdges() +
106             " edges when should have " + edgeCount + vertexCount +
107             " on superstep " + getSuperstep());
108       }
109       // Remove the edges created in superstep 3
110       LongWritable vertexIndex =
111           new LongWritable(rangeVertexIdStart(3) + vertex.getId().get());
112       workerContext.increaseEdgesRemoved();
113       removeEdgesRequest(vertexIndex, vertex.getId());
114     } else if (getSuperstep() == 6) {
115       // Remove all the vertices created in superstep 3
116       if (vertex.getId().compareTo(
117           new LongWritable(rangeVertexIdStart(3))) >= 0) {
118         removeVertexRequest(vertex.getId());
119       }
120     } else if (getSuperstep() == 7) {
121       long origEdgeCount = workerContext.getOrigEdgeCount();
122       if (origEdgeCount != getTotalNumEdges()) {
123         throw new IllegalStateException(
124             "Impossible to have " + getTotalNumEdges() +
125             " edges when should have " + origEdgeCount +
126             " on superstep " + getSuperstep());
127       }
128     } else if (getSuperstep() == 8) {
129       long vertexCount = workerContext.getVertexCount();
130       if (vertexCount / 2 != getTotalNumVertices()) {
131         throw new IllegalStateException(
132             "Impossible to have " + getTotalNumVertices() +
133             " vertices when should have " + vertexCount / 2 +
134             " on superstep " + getSuperstep());
135       }
136     } else if (getSuperstep() == 9) {
137       // Remove all the vertices created in superstep 1, and send a message to
138       // them at the same time
139       if (vertex.getId().compareTo(
140           new LongWritable(rangeVertexIdStart(1))) >= 0) {
141         // This is an added vertex, so remove it
142         removeVertexRequest(vertex.getId());
143       } else {
144         // This is a vertex since the start of the computation, so send a
145         // message to a vertex added in superstep 1
146         sendMessage(
147             new LongWritable(rangeVertexIdStart(1) + vertex.getId().get()),
148             new DoubleWritable(0.0));
149       }
150     } else if (getSuperstep() == 10) {
151       LOG.debug("Reached superstep " + getSuperstep());
152     } else if (getSuperstep() == 11) {
153       long vertexCount = workerContext.getVertexCount();
154       if (vertexCount / 2 != getTotalNumVertices()) {
155         throw new IllegalStateException(
156             "Impossible to have " + getTotalNumVertices() +
157                 " vertices when should have " + vertexCount / 2 +
158                 " on superstep " + getSuperstep());
159       }
160     } else {
161       vertex.voteToHalt();
162     }
163   }
164 
165   /**
166    * Worker context used with {@link SimpleMutateGraphComputation}.
167    */
168   public static class SimpleMutateGraphVertexWorkerContext
169       extends WorkerContext {
170     /** Cached vertex count */
171     private long vertexCount;
172     /** Cached edge count */
173     private long edgeCount;
174     /** Original number of edges */
175     private long origEdgeCount;
176     /** Number of edges removed during superstep */
177     private int edgesRemoved = 0;
178 
179     @Override
180     public void preApplication()
181       throws InstantiationException, IllegalAccessException { }
182 
183     @Override
184     public void postApplication() { }
185 
186     @Override
187     public void preSuperstep() { }
188 
189     @Override
190     public void postSuperstep() {
191       vertexCount = getTotalNumVertices();
192       edgeCount = getTotalNumEdges();
193       if (getSuperstep() == 1) {
194         origEdgeCount = edgeCount;
195       }
196       LOG.info("Got " + vertexCount + " vertices, " +
197           edgeCount + " edges on superstep " +
198           getSuperstep());
199       LOG.info("Removed " + edgesRemoved);
200       edgesRemoved = 0;
201     }
202 
203     public long getVertexCount() {
204       return vertexCount;
205     }
206 
207     public long getEdgeCount() {
208       return edgeCount;
209     }
210 
211     public long getOrigEdgeCount() {
212       return origEdgeCount;
213     }
214 
215     /**
216      * Increase the number of edges removed by one.
217      */
218     public void increaseEdgesRemoved() {
219       this.edgesRemoved++;
220     }
221   }
222 }