View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.graph;
19  
20  import java.io.IOException;
21  import java.util.Iterator;
22  
23  import org.apache.giraph.bsp.CentralizedServiceWorker;
24  import org.apache.giraph.comm.WorkerClientRequestProcessor;
25  import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
26  import org.apache.giraph.conf.TypesHolder;
27  import org.apache.giraph.edge.Edge;
28  import org.apache.giraph.edge.OutEdges;
29  import org.apache.giraph.worker.WorkerAggregatorUsage;
30  import org.apache.giraph.worker.WorkerContext;
31  import org.apache.giraph.worker.WorkerGlobalCommUsage;
32  import org.apache.giraph.worker.WorkerIndexUsage;
33  import org.apache.hadoop.io.Writable;
34  import org.apache.hadoop.io.WritableComparable;
35  import org.apache.hadoop.mapreduce.Mapper;
36  
37  /**
38   * Interface for an application for computation.
39   *
40   * During the superstep there can be several instances of this interface,
41   * each doing computation on one partition of the graph's vertices.
42   *
43   * Note that each thread will have its own {@link Computation},
44   * so accessing any data from this class is thread-safe.
45   * However, accessing global data (like data from {@link WorkerContext})
46   * is not thread-safe.
47   *
48   * Objects of this interface only live for a single superstep.
49   *
50   * @param <I> Vertex id
51   * @param <V> Vertex data
52   * @param <E> Edge data
53   * @param <M1> Incoming message type
54   * @param <M2> Outgoing message type
55   */
56  public interface Computation<I extends WritableComparable,
57      V extends Writable, E extends Writable, M1 extends Writable,
58      M2 extends Writable>
59      extends TypesHolder<I, V, E, M1, M2>,
60      ImmutableClassesGiraphConfigurable<I, V, E>,
61      WorkerGlobalCommUsage, WorkerAggregatorUsage, WorkerIndexUsage<I> {
62    /**
63     * Must be defined by user to do computation on a single Vertex.
64     *
65     * @param vertex   Vertex
66     * @param messages Messages that were sent to this vertex in the previous
67     *                 superstep.  Each message is only guaranteed to have
68     *                 a life expectancy as long as next() is not called.
69     */
70    void compute(Vertex<I, V, E> vertex, Iterable<M1> messages)
71      throws IOException;
72  
73    /**
74     * Prepare for computation. This method is executed exactly once prior to
75     * {@link #compute(Vertex, Iterable)} being called for any of the vertices
76     * in the partition.
77     */
78    void preSuperstep();
79  
80    /**
81     * Finish computation. This method is executed exactly once after computation
82     * for all vertices in the partition is complete.
83     */
84    void postSuperstep();
85  
86    /**
87     * Initialize, called by infrastructure before the superstep starts.
88     * Shouldn't be called by user code.
89     *
90     * @param graphState Graph state
91     * @param workerClientRequestProcessor Processor for handling requests
92     * @param serviceWorker Centralized service worker
93     * @param workerGlobalCommUsage Worker global communication usage
94     */
95    void initialize(GraphState graphState,
96        WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor,
97        CentralizedServiceWorker<I, V, E> serviceWorker,
98        WorkerGlobalCommUsage workerGlobalCommUsage);
99  
100   /**
101    * Retrieves the current superstep.
102    *
103    * @return Current superstep
104    */
105   long getSuperstep();
106 
107   /**
108    * Get the total (all workers) number of vertices that
109    * existed in the previous superstep.
110    *
111    * @return Total number of vertices (-1 if first superstep)
112    */
113   long getTotalNumVertices();
114 
115   /**
116    * Get the total (all workers) number of edges that
117    * existed in the previous superstep.
118    *
119    * @return Total number of edges (-1 if first superstep)
120    */
121   long getTotalNumEdges();
122 
123   /**
124    * Send a message to a vertex id.
125    *
126    * @param id Vertex id to send the message to
127    * @param message Message data to send
128    */
129   void sendMessage(I id, M2 message);
130 
131   /**
132    * Send a message to all edges.
133    *
134    * @param vertex Vertex whose edges to send the message to.
135    * @param message Message sent to all edges.
136    */
137   void sendMessageToAllEdges(Vertex<I, V, E> vertex, M2 message);
138 
139   /**
140    * Send a message to multiple target vertex ids in the iterator.
141    *
142    * @param vertexIdIterator An iterator to multiple target vertex ids.
143    * @param message Message sent to all targets in the iterator.
144    */
145   void sendMessageToMultipleEdges(Iterator<I> vertexIdIterator, M2 message);
146 
147   /**
148    * Sends a request to create a vertex that will be available during the
149    * next superstep.
150    *
151    * @param id Vertex id
152    * @param value Vertex value
153    * @param edges Initial edges
154    */
155   void addVertexRequest(I id, V value, OutEdges<I, E> edges) throws IOException;
156 
157   /**
158    * Sends a request to create a vertex that will be available during the
159    * next superstep.
160    *
161    * @param id Vertex id
162    * @param value Vertex value
163    */
164   void addVertexRequest(I id, V value) throws IOException;
165 
166   /**
167    * Request to remove a vertex from the graph
168    * (applied just prior to the next superstep).
169    *
170    * @param vertexId Id of the vertex to be removed.
171    */
172   void removeVertexRequest(I vertexId) throws IOException;
173 
174   /**
175    * Request to add an edge of a vertex in the graph
176    * (processed just prior to the next superstep)
177    *
178    * @param sourceVertexId Source vertex id of edge
179    * @param edge Edge to add
180    */
181   void addEdgeRequest(I sourceVertexId, Edge<I, E> edge) throws IOException;
182 
183   /**
184    * Request to remove all edges from a given source vertex to a given target
185    * vertex (processed just prior to the next superstep).
186    *
187    * @param sourceVertexId Source vertex id
188    * @param targetVertexId Target vertex id
189    */
190   void removeEdgesRequest(I sourceVertexId, I targetVertexId)
191     throws IOException;
192 
193   /**
194    * Get the mapper context
195    *
196    * @return Mapper context
197    */
198   Mapper.Context getContext();
199 
200   /**
201    * Get the worker context
202    *
203    * @param <W> WorkerContext class
204    * @return WorkerContext context
205    */
206   @SuppressWarnings("unchecked")
207   <W extends WorkerContext> W getWorkerContext();
208 }