This project has retired. For details please refer to its Attic page.
MasterCompute xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.master;
20  
21  import java.util.List;
22  
23  import org.apache.giraph.aggregators.Aggregator;
24  import org.apache.giraph.bsp.CentralizedServiceMaster;
25  import org.apache.giraph.combiner.MessageCombiner;
26  import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
27  import org.apache.giraph.conf.MessageClasses;
28  import org.apache.giraph.graph.Computation;
29  import org.apache.giraph.graph.GraphState;
30  import org.apache.giraph.reducers.ReduceOperation;
31  import org.apache.giraph.worker.WorkerInfo;
32  import org.apache.hadoop.io.Writable;
33  import org.apache.hadoop.io.WritableComparable;
34  import org.apache.hadoop.mapreduce.Mapper;
35  
36  /**
37   * Interface for defining a master vertex that can perform centralized
38   * computation between supersteps. This class will be instantiated on the
39   * master node and will run every superstep before the workers do.
40   *
41   * Communication with the workers should be performed via aggregators. The
42   * values of the aggregators are broadcast to the workers before
43   * vertex.compute() is called and collected by the master before
44   * master.compute() is called. This means aggregator values used by the workers
45   * are consistent with aggregator values from the master from the same
46   * superstep and aggregator used by the master are consistent with aggregator
47   * values from the workers from the previous superstep.
48   */
49  public abstract class MasterCompute
50      extends DefaultImmutableClassesGiraphConfigurable
51      implements MasterAggregatorUsage, MasterGlobalCommUsage, Writable {
52    /** If true, do not do anymore computation on this vertex. */
53    private boolean halt = false;
54    /** Master aggregator usage */
55    private CentralizedServiceMaster serviceMaster;
56    /** Graph state */
57    private GraphState graphState;
58    /**
59     * Computation and MessageCombiner classes used, which can be
60     * switched by master
61     */
62    private SuperstepClasses superstepClasses;
63  
64    /**
65     * Must be defined by user to specify what the master has to do.
66     */
67    public abstract void compute();
68  
69    /**
70     * Initialize the MasterCompute class, this is the place to register
71     * aggregators.
72     *
73     * @throws InstantiationException
74     * @throws IllegalAccessException
75     */
76    public abstract void initialize() throws InstantiationException,
77      IllegalAccessException;
78  
79    /**
80     * Retrieves the current superstep.
81     *
82     * @return Current superstep
83     */
84    public final long getSuperstep() {
85      return graphState.getSuperstep();
86    }
87  
88    /**
89     * Get the total (all workers) number of vertices that
90     * existed in the previous superstep.
91     *
92     * @return Total number of vertices (-1 if first superstep)
93     */
94    public final long getTotalNumVertices() {
95      return graphState.getTotalNumVertices();
96    }
97  
98    /**
99     * Get the total (all workers) number of edges that
100    * existed in the previous superstep.
101    *
102    * @return Total number of edges (-1 if first superstep)
103    */
104   public final long getTotalNumEdges() {
105     return graphState.getTotalNumEdges();
106   }
107 
108   /**
109    * After this is called, the computation will stop, even if there are
110    * still messages in the system or vertices that have not voted to halt.
111    */
112   public final void haltComputation() {
113     halt = true;
114   }
115 
116   /**
117    * Has the master halted?
118    *
119    * @return True if halted, false otherwise.
120    */
121   public final boolean isHalted() {
122     return halt;
123   }
124 
125   /**
126    * Get the mapper context
127    *
128    * @return Mapper context
129    */
130   public final Mapper.Context getContext() {
131     return graphState.getContext();
132   }
133 
134   /**
135    * Get list of workers
136    *
137    * @return List of workers
138    */
139   public final List<WorkerInfo> getWorkerInfoList() {
140     return serviceMaster.getWorkerInfoList();
141   }
142 
143   /**
144    * Set Computation class to be used
145    *
146    * @param computationClass Computation class
147    */
148   public final void setComputation(
149       Class<? extends Computation> computationClass) {
150     superstepClasses.setComputationClass(computationClass);
151   }
152 
153   /**
154    * Get Computation class to be used
155    *
156    * @return Computation class
157    */
158   public final Class<? extends Computation> getComputation() {
159     // Might be called prior to classes being set, do not return NPE
160     if (superstepClasses == null) {
161       return null;
162     }
163 
164     return superstepClasses.getComputationClass();
165   }
166 
167   /**
168    * Set MessageCombiner class to be used
169    *
170    * @param combinerClass MessageCombiner class
171    */
172   public final void setMessageCombiner(
173       Class<? extends MessageCombiner> combinerClass) {
174     superstepClasses.setMessageCombinerClass(combinerClass);
175   }
176 
177   /**
178    * Get MessageCombiner class to be used
179    *
180    * @return MessageCombiner class
181    */
182   public final Class<? extends MessageCombiner> getMessageCombiner() {
183     // Might be called prior to classes being set, do not return NPE
184     if (superstepClasses == null) {
185       return null;
186     }
187 
188     return superstepClasses.getMessageCombinerClass();
189   }
190 
191   /**
192    * Set incoming message class to be used
193    *
194    * @param incomingMessageClass incoming message class
195    */
196   @Deprecated
197   public final void setIncomingMessage(
198       Class<? extends Writable> incomingMessageClass) {
199     superstepClasses.setIncomingMessageClass(incomingMessageClass);
200   }
201 
202   /**
203    * Set outgoing message class to be used
204    *
205    * @param outgoingMessageClass outgoing message class
206    */
207   public final void setOutgoingMessage(
208       Class<? extends Writable> outgoingMessageClass) {
209     superstepClasses.setOutgoingMessageClass(outgoingMessageClass);
210   }
211 
212   /**
213    * Set outgoing message classes to be used
214    *
215    * @param outgoingMessageClasses outgoing message classes
216    */
217   public void setOutgoingMessageClasses(
218       MessageClasses<? extends WritableComparable, ? extends Writable>
219         outgoingMessageClasses) {
220     superstepClasses.setOutgoingMessageClasses(outgoingMessageClasses);
221   }
222 
223   @Override
224   public final <S, R extends Writable> void registerReducer(
225       String name, ReduceOperation<S, R> reduceOp) {
226     serviceMaster.getGlobalCommHandler().registerReducer(name, reduceOp);
227   }
228 
229   @Override
230   public final <S, R extends Writable> void registerReducer(
231       String name, ReduceOperation<S, R> reduceOp, R globalInitialValue) {
232     serviceMaster.getGlobalCommHandler().registerReducer(
233         name, reduceOp, globalInitialValue);
234   }
235 
236   @Override
237   public final <T extends Writable> T getReduced(String name) {
238     return serviceMaster.getGlobalCommHandler().getReduced(name);
239   }
240 
241   @Override
242   public final void broadcast(String name, Writable object) {
243     serviceMaster.getGlobalCommHandler().broadcast(name, object);
244   }
245 
246   @Override
247   public final <A extends Writable> boolean registerAggregator(
248     String name, Class<? extends Aggregator<A>> aggregatorClass)
249     throws InstantiationException, IllegalAccessException {
250     return serviceMaster.getAggregatorTranslationHandler().registerAggregator(
251         name, aggregatorClass);
252   }
253 
254   @Override
255   public final <A extends Writable> boolean registerPersistentAggregator(
256       String name,
257       Class<? extends Aggregator<A>> aggregatorClass) throws
258       InstantiationException, IllegalAccessException {
259     return serviceMaster.getAggregatorTranslationHandler()
260         .registerPersistentAggregator(name, aggregatorClass);
261   }
262 
263   @Override
264   public final <A extends Writable> A getAggregatedValue(String name) {
265     return serviceMaster.getAggregatorTranslationHandler()
266         .<A>getAggregatedValue(name);
267   }
268 
269   @Override
270   public final <A extends Writable> void setAggregatedValue(
271       String name, A value) {
272     serviceMaster.getAggregatorTranslationHandler()
273         .setAggregatedValue(name, value);
274   }
275 
276   /**
277    * Call this to log a line to command line of the job. Use in moderation -
278    * it's a synchronous call to Job client
279    *
280    * @param line Line to print
281    */
282   public void logToCommandLine(String line) {
283     serviceMaster.getJobProgressTracker().logInfo(line);
284   }
285 
286   public final void setGraphState(GraphState graphState) {
287     this.graphState = graphState;
288   }
289 
290   public final void setMasterService(CentralizedServiceMaster serviceMaster) {
291     this.serviceMaster = serviceMaster;
292   }
293 
294   public final void setSuperstepClasses(SuperstepClasses superstepClasses) {
295     this.superstepClasses = superstepClasses;
296   }
297 }