View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.master;
20  
21  import java.util.List;
22  
23  import org.apache.giraph.aggregators.Aggregator;
24  import org.apache.giraph.bsp.CentralizedServiceMaster;
25  import org.apache.giraph.combiner.MessageCombiner;
26  import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
27  import org.apache.giraph.conf.MessageClasses;
28  import org.apache.giraph.graph.Computation;
29  import org.apache.giraph.graph.GraphState;
30  import org.apache.giraph.reducers.ReduceOperation;
31  import org.apache.giraph.worker.WorkerInfo;
32  import org.apache.hadoop.io.Writable;
33  import org.apache.hadoop.io.WritableComparable;
34  import org.apache.hadoop.mapreduce.Mapper;
35  
36  /**
37   * Interface for defining a master vertex that can perform centralized
38   * computation between supersteps. This class will be instantiated on the
39   * master node and will run every superstep before the workers do.
40   *
41   * Communication with the workers should be performed via aggregators. The
42   * values of the aggregators are broadcast to the workers before
43   * vertex.compute() is called and collected by the master before
44   * master.compute() is called. This means aggregator values used by the workers
45   * are consistent with aggregator values from the master from the same
46   * superstep and aggregator used by the master are consistent with aggregator
47   * values from the workers from the previous superstep.
48   */
49  public abstract class MasterCompute
50      extends DefaultImmutableClassesGiraphConfigurable
51      implements MasterAggregatorUsage, MasterGlobalCommUsage, Writable {
52    /** If true, do not do anymore computation on this vertex. */
53    private boolean halt = false;
54    /** Master aggregator usage */
55    private CentralizedServiceMaster serviceMaster;
56    /** Graph state */
57    private GraphState graphState;
58    /**
59     * Computation and MessageCombiner classes used, which can be
60     * switched by master
61     */
62    private SuperstepClasses superstepClasses;
63  
64    /**
65     * Must be defined by user to specify what the master has to do.
66     */
67    public abstract void compute();
68  
69    /**
70     * Initialize the MasterCompute class, this is the place to register
71     * aggregators.
72     */
73    public abstract void initialize() throws InstantiationException,
74      IllegalAccessException;
75  
76    /**
77     * Retrieves the current superstep.
78     *
79     * @return Current superstep
80     */
81    public final long getSuperstep() {
82      return graphState.getSuperstep();
83    }
84  
85    /**
86     * Get the total (all workers) number of vertices that
87     * existed in the previous superstep.
88     *
89     * @return Total number of vertices (-1 if first superstep)
90     */
91    public final long getTotalNumVertices() {
92      return graphState.getTotalNumVertices();
93    }
94  
95    /**
96     * Get the total (all workers) number of edges that
97     * existed in the previous superstep.
98     *
99     * @return Total number of edges (-1 if first superstep)
100    */
101   public final long getTotalNumEdges() {
102     return graphState.getTotalNumEdges();
103   }
104 
105   /**
106    * After this is called, the computation will stop, even if there are
107    * still messages in the system or vertices that have not voted to halt.
108    */
109   public final void haltComputation() {
110     halt = true;
111   }
112 
113   /**
114    * Has the master halted?
115    *
116    * @return True if halted, false otherwise.
117    */
118   public final boolean isHalted() {
119     return halt;
120   }
121 
122   /**
123    * Get the mapper context
124    *
125    * @return Mapper context
126    */
127   public final Mapper.Context getContext() {
128     return graphState.getContext();
129   }
130 
131   /**
132    * Get list of workers
133    *
134    * @return List of workers
135    */
136   public final List<WorkerInfo> getWorkerInfoList() {
137     return serviceMaster.getWorkerInfoList();
138   }
139 
140   /**
141    * Set Computation class to be used
142    *
143    * @param computationClass Computation class
144    */
145   public final void setComputation(
146       Class<? extends Computation> computationClass) {
147     superstepClasses.setComputationClass(computationClass);
148   }
149 
150   /**
151    * Get Computation class to be used
152    *
153    * @return Computation class
154    */
155   public final Class<? extends Computation> getComputation() {
156     // Might be called prior to classes being set, do not return NPE
157     if (superstepClasses == null) {
158       return null;
159     }
160 
161     return superstepClasses.getComputationClass();
162   }
163 
164   /**
165    * Set MessageCombiner class to be used
166    *
167    * @param combinerClass MessageCombiner class
168    */
169   public final void setMessageCombiner(
170       Class<? extends MessageCombiner> combinerClass) {
171     superstepClasses.setMessageCombinerClass(combinerClass);
172   }
173 
174   /**
175    * Get MessageCombiner class to be used
176    *
177    * @return MessageCombiner class
178    */
179   public final Class<? extends MessageCombiner> getMessageCombiner() {
180     // Might be called prior to classes being set, do not return NPE
181     if (superstepClasses == null) {
182       return null;
183     }
184 
185     return superstepClasses.getMessageCombinerClass();
186   }
187 
188   /**
189    * Set incoming message class to be used
190    *
191    * @param incomingMessageClass incoming message class
192    */
193   @Deprecated
194   public final void setIncomingMessage(
195       Class<? extends Writable> incomingMessageClass) {
196     superstepClasses.setIncomingMessageClass(incomingMessageClass);
197   }
198 
199   /**
200    * Set outgoing message class to be used
201    *
202    * @param outgoingMessageClass outgoing message class
203    */
204   public final void setOutgoingMessage(
205       Class<? extends Writable> outgoingMessageClass) {
206     superstepClasses.setOutgoingMessageClass(outgoingMessageClass);
207   }
208 
209   /**
210    * Set outgoing message classes to be used
211    *
212    * @param outgoingMessageClasses outgoing message classes
213    */
214   public void setOutgoingMessageClasses(
215       MessageClasses<? extends WritableComparable, ? extends Writable>
216         outgoingMessageClasses) {
217     superstepClasses.setOutgoingMessageClasses(outgoingMessageClasses);
218   }
219 
220   @Override
221   public final <S, R extends Writable> void registerReducer(
222       String name, ReduceOperation<S, R> reduceOp) {
223     serviceMaster.getGlobalCommHandler().registerReducer(name, reduceOp);
224   }
225 
226   @Override
227   public final <S, R extends Writable> void registerReducer(
228       String name, ReduceOperation<S, R> reduceOp, R globalInitialValue) {
229     serviceMaster.getGlobalCommHandler().registerReducer(
230         name, reduceOp, globalInitialValue);
231   }
232 
233   @Override
234   public final <T extends Writable> T getReduced(String name) {
235     return serviceMaster.getGlobalCommHandler().getReduced(name);
236   }
237 
238   @Override
239   public final void broadcast(String name, Writable object) {
240     serviceMaster.getGlobalCommHandler().broadcast(name, object);
241   }
242 
243   @Override
244   public final <A extends Writable> boolean registerAggregator(
245     String name, Class<? extends Aggregator<A>> aggregatorClass)
246     throws InstantiationException, IllegalAccessException {
247     return serviceMaster.getAggregatorTranslationHandler().registerAggregator(
248         name, aggregatorClass);
249   }
250 
251   @Override
252   public final <A extends Writable> boolean registerPersistentAggregator(
253       String name,
254       Class<? extends Aggregator<A>> aggregatorClass) throws
255       InstantiationException, IllegalAccessException {
256     return serviceMaster.getAggregatorTranslationHandler()
257         .registerPersistentAggregator(name, aggregatorClass);
258   }
259 
260   @Override
261   public final <A extends Writable> A getAggregatedValue(String name) {
262     return serviceMaster.getAggregatorTranslationHandler()
263         .<A>getAggregatedValue(name);
264   }
265 
266   @Override
267   public final <A extends Writable> void setAggregatedValue(
268       String name, A value) {
269     serviceMaster.getAggregatorTranslationHandler()
270         .setAggregatedValue(name, value);
271   }
272 
273   /**
274    * Call this to log a line to command line of the job. Use in moderation -
275    * it's a synchronous call to Job client
276    *
277    * @param line Line to print
278    */
279   public void logToCommandLine(String line) {
280     serviceMaster.getJobProgressTracker().logInfo(line);
281   }
282 
283   public final void setGraphState(GraphState graphState) {
284     this.graphState = graphState;
285   }
286 
287   public final void setMasterService(CentralizedServiceMaster serviceMaster) {
288     this.serviceMaster = serviceMaster;
289   }
290 
291   public final void setSuperstepClasses(SuperstepClasses superstepClasses) {
292     this.superstepClasses = superstepClasses;
293   }
294 }