View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.bsp;
20  
21  import org.apache.giraph.comm.ServerData;
22  import org.apache.giraph.comm.WorkerClient;
23  import org.apache.giraph.graph.AddressesAndPartitionsWritable;
24  import org.apache.giraph.graph.FinishedSuperstepStats;
25  import org.apache.giraph.graph.GlobalStats;
26  import org.apache.giraph.graph.GraphTaskManager;
27  import org.apache.giraph.graph.VertexEdgeCount;
28  import org.apache.giraph.io.superstep_output.SuperstepOutput;
29  import org.apache.giraph.metrics.GiraphTimerContext;
30  import org.apache.giraph.partition.PartitionOwner;
31  import org.apache.giraph.partition.PartitionStats;
32  import org.apache.giraph.partition.PartitionStore;
33  import org.apache.giraph.worker.WorkerInputSplitsHandler;
34  import org.apache.giraph.worker.WorkerAggregatorHandler;
35  import org.apache.giraph.worker.WorkerContext;
36  import org.apache.giraph.worker.WorkerInfo;
37  import org.apache.giraph.worker.WorkerObserver;
38  import org.apache.hadoop.io.Writable;
39  import org.apache.hadoop.io.WritableComparable;
40  
41  import java.io.IOException;
42  import java.util.Collection;
43  import java.util.List;
44  
45  /**
46   * All workers should have access to this centralized service to
47   * execute the following methods.
48   *
49   * @param <I> Vertex id
50   * @param <V> Vertex value
51   * @param <E> Edge value
52   */
53  @SuppressWarnings("rawtypes")
54  public interface CentralizedServiceWorker<I extends WritableComparable,
55    V extends Writable, E extends Writable>
56    extends CentralizedService<I, V, E> {
57    /**
58     * Setup (must be called prior to any other function)
59     *
60     * @return Finished superstep stats for the input superstep
61     */
62    FinishedSuperstepStats setup();
63  
64    /**
65     * Get the worker information
66     *
67     * @return Worker information
68     */
69    WorkerInfo getWorkerInfo();
70  
71    /**
72     * Get the worker client (for instantiating WorkerClientRequestProcessor
73     * instances.
74     *
75     * @return Worker client
76     */
77    WorkerClient<I, V, E> getWorkerClient();
78  
79    /**
80     * Get the worker context.
81     *
82     * @return worker's WorkerContext
83     */
84    WorkerContext getWorkerContext();
85  
86    /**
87     * Get the observers for this Worker.
88     *
89     * @return array of WorkerObservers.
90     */
91    WorkerObserver[] getWorkerObservers();
92  
93    /**
94     * Get the partition store for this worker.
95     * The partitions contain the vertices for
96     * this worker and can be used to run compute() for the vertices or do
97     * checkpointing.
98     *
99     * @return The partition store for this worker.
100    */
101   PartitionStore<I, V, E> getPartitionStore();
102 
103   /**
104    *  Both the vertices and the messages need to be checkpointed in order
105    *  for them to be used.  This is done after all messages have been
106    *  delivered, but prior to a superstep starting.
107    */
108   void storeCheckpoint() throws IOException;
109 
110   /**
111    * Load the vertices, edges, messages from the beginning of a superstep.
112    * Will load the vertex partitions as designated by the master and set the
113    * appropriate superstep.
114    *
115    * @param superstep which checkpoint to use
116    * @return Graph-wide vertex and edge counts
117    * @throws IOException
118    */
119   VertexEdgeCount loadCheckpoint(long superstep) throws IOException;
120 
121   /**
122    * Take all steps prior to actually beginning the computation of a
123    * superstep.
124    *
125    * @return Collection of all the partition owners from the master for this
126    *         superstep.
127    */
128   Collection<? extends PartitionOwner> startSuperstep();
129 
130   /**
131    * Worker is done with its portion of the superstep.  Report the
132    * worker level statistics after the computation.
133    *
134    * @param partitionStatsList All the partition stats for this worker
135    * @param superstepTimerContext superstep timer context only given when the
136    *      function needs to stop the timer, otherwise null.
137    * @return Stats of the superstep completion
138    */
139   FinishedSuperstepStats finishSuperstep(
140       List<PartitionStats> partitionStatsList,
141       GiraphTimerContext superstepTimerContext);
142 
143   /**
144    * Get the partition id that a vertex id would belong to.
145    *
146    * @param vertexId Vertex id
147    * @return Partition id
148    */
149   int getPartitionId(I vertexId);
150 
151   /**
152    * Whether a partition with given id exists on this worker.
153    *
154    * @param partitionId Partition id
155    * @return True iff this worker has the specified partition
156    */
157   boolean hasPartition(Integer partitionId);
158 
159   /**
160    * Every client will need to get a partition owner from a vertex id so that
161    * they know which worker to sent the request to.
162    *
163    * @param vertexId Vertex index to look for
164    * @return PartitionOnwer that should contain this vertex if it exists
165    */
166   PartitionOwner getVertexPartitionOwner(I vertexId);
167 
168   /**
169    * Get all partition owners.
170    *
171    * @return Iterable through partition owners
172    */
173   Iterable<? extends PartitionOwner> getPartitionOwners();
174 
175   /**
176    * If desired by the user, vertex partitions are redistributed among
177    * workers according to the chosen WorkerGraphPartitioner.
178    *
179    * @param masterSetPartitionOwners Partition owner info passed from the
180    *        master.
181    */
182   void exchangeVertexPartitions(
183       Collection<? extends PartitionOwner> masterSetPartitionOwners);
184 
185   /**
186    * Get the GraphTaskManager that this service is using.  Vertices need to know
187    * this.
188    *
189    * @return the GraphTaskManager instance for this compute node
190    */
191   GraphTaskManager<I, V, E> getGraphTaskManager();
192 
193   /**
194    * Operations that will be called if there is a failure by a worker.
195    */
196   void failureCleanup();
197 
198   /**
199    * Get server data
200    *
201    * @return Server data
202    */
203   ServerData<I, V, E> getServerData();
204 
205   /**
206    * Get worker aggregator handler
207    *
208    * @return Worker aggregator handler
209    */
210   WorkerAggregatorHandler getAggregatorHandler();
211 
212   /**
213    * Final preparation for superstep, called after startSuperstep and
214    * potential loading from checkpoint, right before the computation started
215    * TODO how to avoid this additional function
216    */
217   void prepareSuperstep();
218 
219   /**
220    * Get the superstep output class
221    *
222    * @return SuperstepOutput
223    */
224   SuperstepOutput<I, V, E> getSuperstepOutput();
225 
226   /**
227    * Clean up the service (no calls may be issued after this)
228    *
229    * @param finishedSuperstepStats Finished supestep stats
230    * @throws IOException
231    * @throws InterruptedException
232    */
233   void cleanup(FinishedSuperstepStats finishedSuperstepStats)
234     throws IOException, InterruptedException;
235 
236   /**
237    * Loads Global stats from zookeeper.
238    * @return global stats stored in zookeeper for
239    * previous superstep.
240    */
241   GlobalStats getGlobalStats();
242 
243   /**
244    * Get input splits handler used during input
245    *
246    * @return Input splits handler
247    */
248   WorkerInputSplitsHandler getInputSplitsHandler();
249 
250   /**
251    * Received addresses and partitions assignments from master.
252    *
253    * @param addressesAndPartitions Addresses and partitions assignment
254    */
255   void addressesAndPartitionsReceived(
256       AddressesAndPartitionsWritable addressesAndPartitions);
257 }