This project has retired. For details please refer to its Attic page.
AbstractComputation xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.graph;
20  
21  import java.io.IOException;
22  import java.util.Iterator;
23  
24  import org.apache.giraph.bsp.CentralizedServiceWorker;
25  import org.apache.giraph.comm.WorkerClientRequestProcessor;
26  import org.apache.giraph.edge.Edge;
27  import org.apache.giraph.edge.OutEdges;
28  import org.apache.giraph.worker.AllWorkersInfo;
29  import org.apache.giraph.worker.WorkerAggregatorDelegator;
30  import org.apache.giraph.worker.WorkerContext;
31  import org.apache.giraph.worker.WorkerGlobalCommUsage;
32  import org.apache.hadoop.io.Writable;
33  import org.apache.hadoop.io.WritableComparable;
34  import org.apache.hadoop.mapreduce.Mapper;
35  
36  /**
37   * See {@link Computation} for explanation of the interface.
38   *
39   * This is a abstract class helper for users to implement their computations.
40   * It implements all of the methods required by the {@link Computation}
41   * interface except for the {@link #compute(Vertex, Iterable)} which we leave
42   * to the user to define.
43   *
44   * In most cases users should inherit from this class when implementing their
45   * algorithms with Giraph.
46   *
47   * @param <I> Vertex id
48   * @param <V> Vertex data
49   * @param <E> Edge data
50   * @param <M1> Incoming message type
51   * @param <M2> Outgoing message type
52   */
53  public abstract class AbstractComputation<I extends WritableComparable,
54      V extends Writable, E extends Writable, M1 extends Writable,
55      M2 extends Writable>
56      extends WorkerAggregatorDelegator<I, V, E>
57      implements Computation<I, V, E, M1, M2> {
58    /** Global graph state **/
59    private GraphState graphState;
60    /** Handles requests */
61    private WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor;
62    /** Service worker */
63    private CentralizedServiceWorker<I, V, E> serviceWorker;
64    /** Worker context */
65    private WorkerContext workerContext;
66    /** All workers info */
67    private AllWorkersInfo allWorkersInfo;
68  
69    /**
70     * Must be defined by user to do computation on a single Vertex.
71     *
72     * @param vertex   Vertex
73     * @param messages Messages that were sent to this vertex in the previous
74     *                 superstep.  Each message is only guaranteed to have
75     *                 a life expectancy as long as next() is not called.
76     */
77    @Override
78    public abstract void compute(Vertex<I, V, E> vertex,
79        Iterable<M1> messages) throws IOException;
80  
81    /**
82     * Prepare for computation. This method is executed exactly once prior to
83     * {@link #compute(Vertex, Iterable)} being called for any of the vertices
84     * in the partition.
85     */
86    @Override
87    public void preSuperstep() {
88    }
89  
90    /**
91     * Finish computation. This method is executed exactly once after computation
92     * for all vertices in the partition is complete.
93     */
94    @Override
95    public void postSuperstep() {
96    }
97  
98    /**
99     * Initialize, called by infrastructure before the superstep starts.
100    * Shouldn't be called by user code.
101    *
102    * @param graphState Graph state
103    * @param workerClientRequestProcessor Processor for handling requests
104    * @param serviceWorker Graph-wide BSP Mapper for this Vertex
105    * @param workerGlobalCommUsage Worker global communication usage
106    */
107   @Override
108   public void initialize(
109       GraphState graphState,
110       WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor,
111       CentralizedServiceWorker<I, V, E> serviceWorker,
112       WorkerGlobalCommUsage workerGlobalCommUsage) {
113     this.graphState = graphState;
114     this.workerClientRequestProcessor = workerClientRequestProcessor;
115     this.setWorkerGlobalCommUsage(workerGlobalCommUsage);
116     this.serviceWorker = serviceWorker;
117     if (serviceWorker != null) {
118       this.workerContext = serviceWorker.getWorkerContext();
119       this.allWorkersInfo = new AllWorkersInfo(
120           serviceWorker.getWorkerInfoList(), serviceWorker.getWorkerInfo());
121     } else {
122       this.workerContext = null;
123       this.allWorkersInfo = null;
124     }
125   }
126 
127   /**
128    * Retrieves the current superstep.
129    *
130    * @return Current superstep
131    */
132   @Override
133   public long getSuperstep() {
134     return graphState.getSuperstep();
135   }
136 
137   /**
138    * Get the total (all workers) number of vertices that
139    * existed in the previous superstep.
140    *
141    * @return Total number of vertices (-1 if first superstep)
142    */
143   @Override
144   public long getTotalNumVertices() {
145     return graphState.getTotalNumVertices();
146   }
147 
148   /**
149    * Get the total (all workers) number of edges that
150    * existed in the previous superstep.
151    *
152    * @return Total number of edges (-1 if first superstep)
153    */
154   @Override
155   public long getTotalNumEdges() {
156     return graphState.getTotalNumEdges();
157   }
158 
159   /**
160    * Send a message to a vertex id.
161    *
162    * @param id Vertex id to send the message to
163    * @param message Message data to send
164    */
165   @Override
166   public void sendMessage(I id, M2 message) {
167     workerClientRequestProcessor.sendMessageRequest(id, message);
168   }
169 
170   /**
171    * Send a message to all edges.
172    *
173    * @param vertex Vertex whose edges to send the message to.
174    * @param message Message sent to all edges.
175    */
176   @Override
177   public void sendMessageToAllEdges(Vertex<I, V, E> vertex, M2 message) {
178     workerClientRequestProcessor.sendMessageToAllRequest(vertex, message);
179   }
180 
181   /**
182    * Send a message to multiple target vertex ids in the iterator.
183    *
184    * @param vertexIdIterator An iterator to multiple target vertex ids.
185    * @param message Message sent to all targets in the iterator.
186    */
187   @Override
188   public void sendMessageToMultipleEdges(
189       Iterator<I> vertexIdIterator, M2 message) {
190     workerClientRequestProcessor.sendMessageToAllRequest(
191         vertexIdIterator, message);
192   }
193 
194   /**
195    * Sends a request to create a vertex that will be available during the
196    * next superstep.
197    *
198    * @param id Vertex id
199    * @param value Vertex value
200    * @param edges Initial edges
201    */
202   @Override
203   public void addVertexRequest(I id, V value,
204       OutEdges<I, E> edges) throws IOException {
205     Vertex<I, V, E> vertex = getConf().createVertex();
206     vertex.initialize(id, value, edges);
207     workerClientRequestProcessor.addVertexRequest(vertex);
208   }
209 
210   /**
211    * Sends a request to create a vertex that will be available during the
212    * next superstep.
213    *
214    * @param id Vertex id
215    * @param value Vertex value
216    */
217   @Override
218   public void addVertexRequest(I id, V value) throws IOException {
219     addVertexRequest(id, value, getConf().createAndInitializeOutEdges());
220   }
221 
222   /**
223    * Request to remove a vertex from the graph
224    * (applied just prior to the next superstep).
225    *
226    * @param vertexId Id of the vertex to be removed.
227    */
228   @Override
229   public void removeVertexRequest(I vertexId) throws IOException {
230     workerClientRequestProcessor.removeVertexRequest(vertexId);
231   }
232 
233   /**
234    * Request to add an edge of a vertex in the graph
235    * (processed just prior to the next superstep)
236    *
237    * @param sourceVertexId Source vertex id of edge
238    * @param edge Edge to add
239    */
240   @Override
241   public void addEdgeRequest(I sourceVertexId,
242       Edge<I, E> edge) throws IOException {
243     workerClientRequestProcessor.addEdgeRequest(sourceVertexId, edge);
244   }
245 
246   /**
247    * Request to remove all edges from a given source vertex to a given target
248    * vertex (processed just prior to the next superstep).
249    *
250    * @param sourceVertexId Source vertex id
251    * @param targetVertexId Target vertex id
252    */
253   @Override
254   public void removeEdgesRequest(I sourceVertexId,
255       I targetVertexId) throws IOException {
256     workerClientRequestProcessor.removeEdgesRequest(
257         sourceVertexId, targetVertexId);
258   }
259 
260   /**
261    * Get the mapper context
262    *
263    * @return Mapper context
264    */
265   @Override
266   public Mapper.Context getContext() {
267     return graphState.getContext();
268   }
269 
270   /**
271    * Get the worker context
272    *
273    * @param <W> WorkerContext class
274    * @return WorkerContext context
275    */
276   @SuppressWarnings("unchecked")
277   @Override
278   public <W extends WorkerContext> W getWorkerContext() {
279     return (W) workerContext;
280   }
281 
282   @Override
283   public final int getWorkerCount() {
284     return allWorkersInfo.getWorkerCount();
285   }
286 
287   @Override
288   public final int getMyWorkerIndex() {
289     return allWorkersInfo.getMyWorkerIndex();
290   }
291 
292   @Override
293   public final int getWorkerForVertex(I vertexId) {
294     return allWorkersInfo.getWorkerIndex(
295         serviceWorker.getVertexPartitionOwner(vertexId).getWorkerInfo());
296   }
297 }