1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.graph;
2021import java.io.IOException;
22import java.util.Iterator;
2324import org.apache.giraph.bsp.CentralizedServiceWorker;
25import org.apache.giraph.comm.WorkerClientRequestProcessor;
26import org.apache.giraph.edge.Edge;
27import org.apache.giraph.edge.OutEdges;
28import org.apache.giraph.worker.AllWorkersInfo;
29import org.apache.giraph.worker.WorkerAggregatorDelegator;
30import org.apache.giraph.worker.WorkerContext;
31import org.apache.giraph.worker.WorkerGlobalCommUsage;
32import org.apache.hadoop.io.Writable;
33import org.apache.hadoop.io.WritableComparable;
34import org.apache.hadoop.mapreduce.Mapper;
3536/**37 * See {@link Computation} for explanation of the interface.38 *39 * This is a abstract class helper for users to implement their computations.40 * It implements all of the methods required by the {@link Computation}41 * interface except for the {@link #compute(Vertex, Iterable)} which we leave42 * to the user to define.43 *44 * In most cases users should inherit from this class when implementing their45 * algorithms with Giraph.46 *47 * @param <I> Vertex id48 * @param <V> Vertex data49 * @param <E> Edge data50 * @param <M1> Incoming message type51 * @param <M2> Outgoing message type52 */53publicabstractclass AbstractComputation<I extends WritableComparable,
54 V extends Writable, E extends Writable, M1 extends Writable,
55 M2 extends Writable>
56extends WorkerAggregatorDelegator<I, V, E>
57implements Computation<I, V, E, M1, M2> {
58/** Global graph state **/59privateGraphState graphState;
60/** Handles requests */61private WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor;
62/** Service worker */63private CentralizedServiceWorker<I, V, E> serviceWorker;
64/** Worker context */65privateWorkerContext workerContext;
66/** All workers info */67privateAllWorkersInfo allWorkersInfo;
6869/**70 * Must be defined by user to do computation on a single Vertex.71 *72 * @param vertex Vertex73 * @param messages Messages that were sent to this vertex in the previous74 * superstep. Each message is only guaranteed to have75 * a life expectancy as long as next() is not called.76 */77 @Override
78publicabstractvoid compute(Vertex<I, V, E> vertex,
79 Iterable<M1> messages) throws IOException;
8081/**82 * Prepare for computation. This method is executed exactly once prior to83 * {@link #compute(Vertex, Iterable)} being called for any of the vertices84 * in the partition.85 */86 @Override
87publicvoid preSuperstep() {
88 }
8990/**91 * Finish computation. This method is executed exactly once after computation92 * for all vertices in the partition is complete.93 */94 @Override
95publicvoid postSuperstep() {
96 }
9798/**99 * Initialize, called by infrastructure before the superstep starts.100 * Shouldn't be called by user code.101 *102 * @param graphState Graph state103 * @param workerClientRequestProcessor Processor for handling requests104 * @param serviceWorker Graph-wide BSP Mapper for this Vertex105 * @param workerGlobalCommUsage Worker global communication usage106 */107 @Override
108publicvoid initialize(
109GraphState graphState,
110 WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor,
111 CentralizedServiceWorker<I, V, E> serviceWorker,
112WorkerGlobalCommUsage workerGlobalCommUsage) {
113this.graphState = graphState;
114this.workerClientRequestProcessor = workerClientRequestProcessor;
115this.setWorkerGlobalCommUsage(workerGlobalCommUsage);
116this.serviceWorker = serviceWorker;
117if (serviceWorker != null) {
118this.workerContext = serviceWorker.getWorkerContext();
119this.allWorkersInfo = newAllWorkersInfo(
120 serviceWorker.getWorkerInfoList(), serviceWorker.getWorkerInfo());
121 } else {
122this.workerContext = null;
123this.allWorkersInfo = null;
124 }
125 }
126127/**128 * Retrieves the current superstep.129 *130 * @return Current superstep131 */132 @Override
133publiclong getSuperstep() {
134return graphState.getSuperstep();
135 }
136137/**138 * Get the total (all workers) number of vertices that139 * existed in the previous superstep.140 *141 * @return Total number of vertices (-1 if first superstep)142 */143 @Override
144publiclong getTotalNumVertices() {
145return graphState.getTotalNumVertices();
146 }
147148/**149 * Get the total (all workers) number of edges that150 * existed in the previous superstep.151 *152 * @return Total number of edges (-1 if first superstep)153 */154 @Override
155publiclong getTotalNumEdges() {
156return graphState.getTotalNumEdges();
157 }
158159/**160 * Send a message to a vertex id.161 *162 * @param id Vertex id to send the message to163 * @param message Message data to send164 */165 @Override
166publicvoid sendMessage(I id, M2 message) {
167 workerClientRequestProcessor.sendMessageRequest(id, message);
168 }
169170/**171 * Send a message to all edges.172 *173 * @param vertex Vertex whose edges to send the message to.174 * @param message Message sent to all edges.175 */176 @Override
177publicvoid sendMessageToAllEdges(Vertex<I, V, E> vertex, M2 message) {
178 workerClientRequestProcessor.sendMessageToAllRequest(vertex, message);
179 }
180181/**182 * Send a message to multiple target vertex ids in the iterator.183 *184 * @param vertexIdIterator An iterator to multiple target vertex ids.185 * @param message Message sent to all targets in the iterator.186 */187 @Override
188publicvoid sendMessageToMultipleEdges(
189 Iterator<I> vertexIdIterator, M2 message) {
190 workerClientRequestProcessor.sendMessageToAllRequest(
191 vertexIdIterator, message);
192 }
193194/**195 * Sends a request to create a vertex that will be available during the196 * next superstep.197 *198 * @param id Vertex id199 * @param value Vertex value200 * @param edges Initial edges201 */202 @Override
203publicvoid addVertexRequest(I id, V value,
204 OutEdges<I, E> edges) throws IOException {
205 Vertex<I, V, E> vertex = getConf().createVertex();
206 vertex.initialize(id, value, edges);
207 workerClientRequestProcessor.addVertexRequest(vertex);
208 }
209210/**211 * Sends a request to create a vertex that will be available during the212 * next superstep.213 *214 * @param id Vertex id215 * @param value Vertex value216 */217 @Override
218publicvoid addVertexRequest(I id, V value) throws IOException {
219 addVertexRequest(id, value, getConf().createAndInitializeOutEdges());
220 }
221222/**223 * Request to remove a vertex from the graph224 * (applied just prior to the next superstep).225 *226 * @param vertexId Id of the vertex to be removed.227 */228 @Override
229publicvoid removeVertexRequest(I vertexId) throws IOException {
230 workerClientRequestProcessor.removeVertexRequest(vertexId);
231 }
232233/**234 * Request to add an edge of a vertex in the graph235 * (processed just prior to the next superstep)236 *237 * @param sourceVertexId Source vertex id of edge238 * @param edge Edge to add239 */240 @Override
241publicvoid addEdgeRequest(I sourceVertexId,
242 Edge<I, E> edge) throws IOException {
243 workerClientRequestProcessor.addEdgeRequest(sourceVertexId, edge);
244 }
245246/**247 * Request to remove all edges from a given source vertex to a given target248 * vertex (processed just prior to the next superstep).249 *250 * @param sourceVertexId Source vertex id251 * @param targetVertexId Target vertex id252 */253 @Override
254publicvoid removeEdgesRequest(I sourceVertexId,
255 I targetVertexId) throws IOException {
256 workerClientRequestProcessor.removeEdgesRequest(
257 sourceVertexId, targetVertexId);
258 }
259260/**261 * Get the mapper context262 *263 * @return Mapper context264 */265 @Override
266public Mapper.Context getContext() {
267return graphState.getContext();
268 }
269270/**271 * Get the worker context272 *273 * @param <W> WorkerContext class274 * @return WorkerContext context275 */276 @SuppressWarnings("unchecked")
277 @Override
278public <W extends WorkerContext> W getWorkerContext() {
279return (W) workerContext;
280 }
281282 @Override
283publicfinalint getWorkerCount() {
284return allWorkersInfo.getWorkerCount();
285 }
286287 @Override
288publicfinalint getMyWorkerIndex() {
289return allWorkersInfo.getMyWorkerIndex();
290 }
291292 @Override
293publicfinalint getWorkerForVertex(I vertexId) {
294return allWorkersInfo.getWorkerIndex(
295 serviceWorker.getVertexPartitionOwner(vertexId).getWorkerInfo());
296 }
297 }