View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.bsp;
20  
21  import java.io.IOException;
22  import java.util.Collection;
23  import java.util.List;
24  
25  import org.apache.giraph.comm.ServerData;
26  import org.apache.giraph.comm.WorkerClient;
27  import org.apache.giraph.comm.messages.PartitionSplitInfo;
28  import org.apache.giraph.graph.AddressesAndPartitionsWritable;
29  import org.apache.giraph.graph.FinishedSuperstepStats;
30  import org.apache.giraph.graph.GlobalStats;
31  import org.apache.giraph.graph.GraphTaskManager;
32  import org.apache.giraph.graph.VertexEdgeCount;
33  import org.apache.giraph.io.superstep_output.SuperstepOutput;
34  import org.apache.giraph.metrics.GiraphTimerContext;
35  import org.apache.giraph.partition.PartitionOwner;
36  import org.apache.giraph.partition.PartitionStats;
37  import org.apache.giraph.partition.PartitionStore;
38  import org.apache.giraph.worker.WorkerAggregatorHandler;
39  import org.apache.giraph.worker.WorkerContext;
40  import org.apache.giraph.worker.WorkerInfo;
41  import org.apache.giraph.worker.WorkerInputSplitsHandler;
42  import org.apache.giraph.worker.WorkerObserver;
43  import org.apache.hadoop.io.Writable;
44  import org.apache.hadoop.io.WritableComparable;
45  
46  /**
47   * All workers should have access to this centralized service to
48   * execute the following methods.
49   *
50   * @param <I> Vertex id
51   * @param <V> Vertex value
52   * @param <E> Edge value
53   */
54  @SuppressWarnings("rawtypes")
55  public interface CentralizedServiceWorker<I extends WritableComparable,
56    V extends Writable, E extends Writable>
57    extends CentralizedService<I, V, E>, PartitionSplitInfo<I> {
58    /**
59     * Setup (must be called prior to any other function)
60     *
61     * @return Finished superstep stats for the input superstep
62     */
63    FinishedSuperstepStats setup();
64  
65    /**
66     * Get the worker information
67     *
68     * @return Worker information
69     */
70    WorkerInfo getWorkerInfo();
71  
72    /**
73     * Get the worker client (for instantiating WorkerClientRequestProcessor
74     * instances.
75     *
76     * @return Worker client
77     */
78    WorkerClient<I, V, E> getWorkerClient();
79  
80    /**
81     * Get the worker context.
82     *
83     * @return worker's WorkerContext
84     */
85    WorkerContext getWorkerContext();
86  
87    /**
88     * Get the observers for this Worker.
89     *
90     * @return array of WorkerObservers.
91     */
92    WorkerObserver[] getWorkerObservers();
93  
94    /**
95     * Get the partition store for this worker.
96     * The partitions contain the vertices for
97     * this worker and can be used to run compute() for the vertices or do
98     * checkpointing.
99     *
100    * @return The partition store for this worker.
101    */
102   PartitionStore<I, V, E> getPartitionStore();
103 
104   /**
105    *  Both the vertices and the messages need to be checkpointed in order
106    *  for them to be used.  This is done after all messages have been
107    *  delivered, but prior to a superstep starting.
108    */
109   void storeCheckpoint() throws IOException;
110 
111   /**
112    * Load the vertices, edges, messages from the beginning of a superstep.
113    * Will load the vertex partitions as designated by the master and set the
114    * appropriate superstep.
115    *
116    * @param superstep which checkpoint to use
117    * @return Graph-wide vertex and edge counts
118    * @throws IOException
119    */
120   VertexEdgeCount loadCheckpoint(long superstep) throws IOException;
121 
122   /**
123    * Take all steps prior to actually beginning the computation of a
124    * superstep.
125    *
126    * @return Collection of all the partition owners from the master for this
127    *         superstep.
128    */
129   Collection<? extends PartitionOwner> startSuperstep();
130 
131   /**
132    * Worker is done with its portion of the superstep.  Report the
133    * worker level statistics after the computation.
134    *
135    * @param partitionStatsList All the partition stats for this worker
136    * @param superstepTimerContext superstep timer context only given when the
137    *      function needs to stop the timer, otherwise null.
138    * @return Stats of the superstep completion
139    */
140   FinishedSuperstepStats finishSuperstep(
141       List<PartitionStats> partitionStatsList,
142       GiraphTimerContext superstepTimerContext);
143 
144   /**
145    * Get the partition id that a vertex id would belong to.
146    *
147    * @param vertexId Vertex id
148    * @return Partition id
149    */
150   @Override
151   int getPartitionId(I vertexId);
152 
153   /**
154    * Whether a partition with given id exists on this worker.
155    *
156    * @param partitionId Partition id
157    * @return True iff this worker has the specified partition
158    */
159   boolean hasPartition(Integer partitionId);
160 
161   /**
162    * Every client will need to get a partition owner from a vertex id so that
163    * they know which worker to sent the request to.
164    *
165    * @param vertexId Vertex index to look for
166    * @return PartitionOnwer that should contain this vertex if it exists
167    */
168   PartitionOwner getVertexPartitionOwner(I vertexId);
169 
170   /**
171    * Get all partition owners.
172    *
173    * @return Iterable through partition owners
174    */
175   Iterable<? extends PartitionOwner> getPartitionOwners();
176 
177   /**
178    * If desired by the user, vertex partitions are redistributed among
179    * workers according to the chosen WorkerGraphPartitioner.
180    *
181    * @param masterSetPartitionOwners Partition owner info passed from the
182    *        master.
183    */
184   void exchangeVertexPartitions(
185       Collection<? extends PartitionOwner> masterSetPartitionOwners);
186 
187   /**
188    * Get the GraphTaskManager that this service is using.  Vertices need to know
189    * this.
190    *
191    * @return the GraphTaskManager instance for this compute node
192    */
193   GraphTaskManager<I, V, E> getGraphTaskManager();
194 
195   /**
196    * Operations that will be called if there is a failure by a worker.
197    */
198   void failureCleanup();
199 
200   /**
201    * Get server data
202    *
203    * @return Server data
204    */
205   ServerData<I, V, E> getServerData();
206 
207   /**
208    * Get worker aggregator handler
209    *
210    * @return Worker aggregator handler
211    */
212   WorkerAggregatorHandler getAggregatorHandler();
213 
214   /**
215    * Final preparation for superstep, called after startSuperstep and
216    * potential loading from checkpoint, right before the computation started
217    * TODO how to avoid this additional function
218    */
219   void prepareSuperstep();
220 
221   /**
222    * Get the superstep output class
223    *
224    * @return SuperstepOutput
225    */
226   SuperstepOutput<I, V, E> getSuperstepOutput();
227 
228   /**
229    * Clean up the service (no calls may be issued after this)
230    *
231    * @param finishedSuperstepStats Finished supestep stats
232    * @throws IOException
233    * @throws InterruptedException
234    */
235   void cleanup(FinishedSuperstepStats finishedSuperstepStats)
236     throws IOException, InterruptedException;
237 
238   /**
239    * Loads Global stats from zookeeper.
240    * @return global stats stored in zookeeper for
241    * previous superstep.
242    */
243   GlobalStats getGlobalStats();
244 
245   /**
246    * Get input splits handler used during input
247    *
248    * @return Input splits handler
249    */
250   WorkerInputSplitsHandler getInputSplitsHandler();
251 
252   /**
253    * Received addresses and partitions assignments from master.
254    *
255    * @param addressesAndPartitions Addresses and partitions assignment
256    */
257   void addressesAndPartitionsReceived(
258       AddressesAndPartitionsWritable addressesAndPartitions);
259 }