This project has retired. For details please refer to its Attic page.
DefaultParentPiece xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.piece;
19  
20  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
21  import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
22  import org.apache.giraph.block_app.framework.api.CreateReducersApi;
23  import org.apache.giraph.block_app.framework.piece.global_comm.ReduceUtilsObject;
24  import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
25  import org.apache.giraph.block_app.framework.piece.global_comm.internal.CreateReducersApiWrapper;
26  import org.apache.giraph.block_app.framework.piece.global_comm.internal.ReducersForPieceHandler;
27  import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
28  import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
29  import org.apache.giraph.block_app.framework.piece.messages.ObjectMessageClasses;
30  import org.apache.giraph.block_app.framework.piece.messages.SupplierFromConf;
31  import org.apache.giraph.block_app.framework.piece.messages.SupplierFromConf.DefaultMessageFactorySupplierFromConf;
32  import org.apache.giraph.block_app.framework.piece.messages.SupplierFromConf.SupplierFromConfByCopy;
33  import org.apache.giraph.combiner.MessageCombiner;
34  import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
35  import org.apache.giraph.conf.EnumConfOption;
36  import org.apache.giraph.conf.GiraphConfigurationSettable;
37  import org.apache.giraph.conf.GiraphConstants;
38  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
39  import org.apache.giraph.conf.MessageClasses;
40  import org.apache.giraph.factories.MessageValueFactory;
41  import org.apache.giraph.graph.Vertex;
42  import org.apache.giraph.types.NoMessage;
43  import org.apache.hadoop.io.DoubleWritable;
44  import org.apache.hadoop.io.FloatWritable;
45  import org.apache.hadoop.io.IntWritable;
46  import org.apache.hadoop.io.LongWritable;
47  import org.apache.hadoop.io.Writable;
48  import org.apache.hadoop.io.WritableComparable;
49  
50  import com.google.common.base.Preconditions;
51  
52  /**
53   * Additional abstract implementations for all pieces to be used.
54   * Code here is not in AbstractPiece only to allow for non-standard
55   * non-user-defined pieces. <br>
56   * Only logic used by the underlying framework directly is in AbstractPiece
57   * itself.
58   *
59   * @param <I> Vertex id type
60   * @param <V> Vertex value type
61   * @param <E> Edge value type
62   * @param <M> Message type
63   * @param <WV> Worker value type
64   * @param <WM> Worker message type
65   * @param <S> Execution stage type
66   */
67  @SuppressWarnings({ "rawtypes", "unchecked" })
68  public abstract class DefaultParentPiece<I extends WritableComparable,
69      V extends Writable, E extends Writable, M extends Writable, WV,
70      WM extends Writable, S> extends AbstractPiece<I, V, E, M, WV, WM, S> {
71    // TODO move to GiraphConstants
72    /**
73     * This option will tell which message encode &amp; store enum to force,
74     * when combining is not enabled.
75     *
76     * MESSAGE_ENCODE_AND_STORE_TYPE and this property are basically upper
77     * and lower bound on message store type, when looking them in order from
78     * not doing anything special, to most advanced type:
79     * BYTEARRAY_PER_PARTITION,
80     * EXTRACT_BYTEARRAY_PER_PARTITION,
81     * POINTER_LIST_PER_VERTEX
82     * resulting encode type is going to be:
83     * pieceEncodingType = piece.allowOneMessageToManyIdsEncoding() ?
84     *    POINTER_LIST_PER_VERTEX : BYTEARRAY_PER_PARTITION)
85     * Math.max(index(minForce), Math.min(index(maxAllowed), index(pieceType);
86     *
87     * This is useful to force all pieces onto particular message store, even
88     * if they do not overrideallowOneMessageToManyIdsEncoding, though that might
89     * be rarely needed.
90     * This option might be more useful for fully local computation,
91     * where overall job behavior is quite different.
92     */
93    public static final EnumConfOption<MessageEncodeAndStoreType>
94    MESSAGE_ENCODE_AND_STORE_TYPE_MIN_FORCE =
95        EnumConfOption.create("giraph.messageEncodeAndStoreTypeMinForce",
96            MessageEncodeAndStoreType.class,
97            MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION,
98            "Select the message_encode_and_store_type min force to use");
99  
100   private final ReduceUtilsObject reduceUtils = new ReduceUtilsObject();
101   private ReducersForPieceHandler reducersHandler;
102 
103   // Overridable functions
104 
105   /**
106    * Override to register any potential reducers used by this piece,
107    * through calls to {@code reduceApi}, which will return reducer handles
108    * for simple.
109    * Tip: Without defining a field, first write here name of the field and what
110    * you want to reduce, like:
111    * {@code totalSum = reduceApi.createLocalReducer(SumReduce.DOUBLE); }
112    * and then use tools your IDE provides to generate field signature itself,
113    * which might be slightly complex:
114    * {@code ReducerHandle<DoubleWritable, DoubleWritable> totalSum; }
115    */
116   public void registerReducers(CreateReducersApi reduceApi, S executionStage) {
117   }
118 
119   /**
120    * Override to do vertex send processing.
121    *
122    * Creates handler that defines what should be executed on worker
123    * during send phase.
124    *
125    * This logic gets executed first.
126    * This function is called once on each worker on each thread, in parallel,
127    * on their copy of Piece object to create functions handler.
128    *
129    * If returned object implements Postprocessor interface, then corresponding
130    * postprocess() function is going to be called once, after all vertices
131    * corresponding thread needed to process are done.
132    */
133   public VertexSender<I, V, E> getVertexSender(
134       BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
135     return null;
136   }
137 
138   /**
139    * Override to specify type of the message this Piece sends, if it does
140    * send messages.
141    *
142    * If not overwritten, no messages can be sent.
143    */
144   protected Class<M> getMessageClass() {
145     return null;
146   }
147 
148   /**
149    * Override to specify message value factory to be used,
150    * which creates objects into which messages will be deserialized.
151    *
152    * If not overwritten, or null is returned, DefaultMessageValueFactory
153    * will be used.
154    */
155   protected MessageValueFactory<M> getMessageFactory(
156       ImmutableClassesGiraphConfiguration conf) {
157     return null;
158   }
159 
160   /**
161    * Override to specify message combiner to be used, if any.
162    *
163    * Message combiner itself should be immutable
164    * (i.e. it will be call simultanously from multiple threads)
165    */
166   protected MessageCombiner<? super I, M> getMessageCombiner(
167       ImmutableClassesGiraphConfiguration conf) {
168     return null;
169   }
170 
171   /**
172    * Override to specify that this Piece allows one to many ids encoding to be
173    * used for messages.
174    * You should override this function, if you are sending identical message to
175    * all targets, and message itself is not extremely small.
176    */
177   protected boolean allowOneMessageToManyIdsEncoding() {
178     return false;
179   }
180 
181   /**
182    * Override to specify that receive of this Piece (and send of next Piece)
183    * ignore existing vertices, and just process received messages.
184    *
185    * Useful when distributed processing on groups that are not vertices is
186    * needed. This flag allows you not to worry whether a destination vertex
187    * exist, and removes need to clean it up when finished.
188    * One example is if each vertex is in a cluster, and we need to process
189    * something per cluster.
190    *
191    * Alternative are reducers, which have distributed reduction, but mostly
192    * master still does the processing afterwards, and amount of data needs to
193    * fit single machine (master).
194    */
195   protected boolean receiveIgnoreExistingVertices() {
196     return false;
197   }
198 
199   @Override
200   public MessageClasses<I, M> getMessageClasses(
201       ImmutableClassesGiraphConfiguration conf) {
202     Class<M> messageClass = null;
203     MessageValueFactory<M> messageFactory = getMessageFactory(conf);
204     MessageCombiner<? super I, M> messageCombiner = getMessageCombiner(conf);
205 
206     if (messageFactory != null) {
207       messageClass = (Class) messageFactory.newInstance().getClass();
208     } else if (messageCombiner != null) {
209       messageClass = (Class) messageCombiner.createInitialMessage().getClass();
210     }
211 
212     if (messageClass != null) {
213       Preconditions.checkState(getMessageClass() == null,
214           "Piece %s defines getMessageFactory or getMessageCombiner, " +
215           "so it doesn't need to define getMessageClass.",
216           toString());
217     } else {
218       messageClass = getMessageClass();
219       if (messageClass == null) {
220         messageClass = (Class) NoMessage.class;
221       }
222     }
223 
224     SupplierFromConf<MessageValueFactory<M>> messageFactorySupplier;
225     if (messageFactory != null) {
226       messageFactorySupplier =
227           new SupplierFromConfByCopy<MessageValueFactory<M>>(messageFactory);
228     } else {
229       messageFactorySupplier =
230           new DefaultMessageFactorySupplierFromConf<>(messageClass);
231     }
232 
233     SupplierFromConf<? extends MessageCombiner<? super I, M>>
234     messageCombinerSupplier;
235     if (messageCombiner != null) {
236       messageCombinerSupplier = new SupplierFromConfByCopy<>(messageCombiner);
237     } else {
238       messageCombinerSupplier = null;
239     }
240 
241     int maxAllowed =
242         GiraphConstants.MESSAGE_ENCODE_AND_STORE_TYPE.get(conf).ordinal();
243     int minForce = MESSAGE_ENCODE_AND_STORE_TYPE_MIN_FORCE.get(conf).ordinal();
244     Preconditions.checkState(maxAllowed >= minForce);
245 
246     int pieceEncodeType = (allowOneMessageToManyIdsEncoding() ?
247         MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX :
248         MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION).ordinal();
249     // bound piece type with boundaries:
250     pieceEncodeType = Math.max(minForce, Math.min(maxAllowed, pieceEncodeType));
251 
252     MessageEncodeAndStoreType messageEncodeAndStoreType =
253         MessageEncodeAndStoreType.values()[pieceEncodeType];
254 
255     if (messageFactory instanceof GiraphConfigurationSettable) {
256       throw new IllegalStateException(
257           messageFactory.getClass() + " MessageFactory in " + this +
258           " Piece implements GiraphConfigurationSettable");
259     }
260     if (messageCombiner instanceof GiraphConfigurationSettable) {
261       throw new IllegalStateException(
262           messageCombiner.getClass() + " MessageCombiner in " + this +
263           " Piece implements GiraphConfigurationSettable");
264     }
265 
266     return new ObjectMessageClasses<>(
267         messageClass, messageFactorySupplier,
268         messageCombinerSupplier, messageEncodeAndStoreType,
269         receiveIgnoreExistingVertices());
270   }
271 
272   // Internal implementation
273 
274   @Override
275   public final InnerVertexSender getWrappedVertexSender(
276       final BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
277     reducersHandler.vertexSenderWorkerPreprocess(workerApi);
278     final VertexSender<I, V, E> functions =
279         getVertexSender(workerApi, executionStage);
280     return new InnerVertexSender() {
281       @Override
282       public void vertexSend(Vertex<I, V, E> vertex) {
283         if (functions != null) {
284           functions.vertexSend(vertex);
285         }
286       }
287       @Override
288       public void postprocess() {
289         if (functions instanceof VertexPostprocessor) {
290           ((VertexPostprocessor) functions).postprocess();
291         }
292         reducersHandler.vertexSenderWorkerPostprocess(workerApi);
293       }
294     };
295   }
296 
297   @Override
298   public final void wrappedRegisterReducers(
299       BlockMasterApi masterApi, S executionStage) {
300     reducersHandler = new ReducersForPieceHandler();
301     registerReducers(new CreateReducersApiWrapper(
302         masterApi, reducersHandler), executionStage);
303   }
304 
305   // utility functions:
306   // TODO Java8 - move these as default functions to VertexSender interface
307   protected final void reduceDouble(
308       ReducerHandle<DoubleWritable, ?> reduceHandle, double value) {
309     reduceUtils.reduceDouble(reduceHandle, value);
310   }
311 
312   protected final void reduceFloat(
313       ReducerHandle<FloatWritable, ?> reduceHandle, float value) {
314     reduceUtils.reduceFloat(reduceHandle, value);
315   }
316 
317   protected final void reduceLong(
318       ReducerHandle<LongWritable, ?> reduceHandle, long value) {
319     reduceUtils.reduceLong(reduceHandle, value);
320   }
321 
322   protected final void reduceInt(
323       ReducerHandle<IntWritable, ?> reduceHandle, int value) {
324     reduceUtils.reduceInt(reduceHandle, value);
325   }
326 }