View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.piece;
19  
20  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
21  import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
22  import org.apache.giraph.block_app.framework.api.CreateReducersApi;
23  import org.apache.giraph.block_app.framework.piece.global_comm.ReduceUtilsObject;
24  import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
25  import org.apache.giraph.block_app.framework.piece.global_comm.internal.CreateReducersApiWrapper;
26  import org.apache.giraph.block_app.framework.piece.global_comm.internal.ReducersForPieceHandler;
27  import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
28  import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
29  import org.apache.giraph.block_app.framework.piece.messages.ObjectMessageClasses;
30  import org.apache.giraph.block_app.framework.piece.messages.SupplierFromConf;
31  import org.apache.giraph.block_app.framework.piece.messages.SupplierFromConf.DefaultMessageFactorySupplierFromConf;
32  import org.apache.giraph.block_app.framework.piece.messages.SupplierFromConf.SupplierFromConfByCopy;
33  import org.apache.giraph.combiner.MessageCombiner;
34  import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
35  import org.apache.giraph.conf.EnumConfOption;
36  import org.apache.giraph.conf.GiraphConfigurationSettable;
37  import org.apache.giraph.conf.GiraphConstants;
38  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
39  import org.apache.giraph.conf.MessageClasses;
40  import org.apache.giraph.factories.MessageValueFactory;
41  import org.apache.giraph.graph.Vertex;
42  import org.apache.giraph.types.NoMessage;
43  import org.apache.hadoop.io.DoubleWritable;
44  import org.apache.hadoop.io.FloatWritable;
45  import org.apache.hadoop.io.IntWritable;
46  import org.apache.hadoop.io.LongWritable;
47  import org.apache.hadoop.io.Writable;
48  import org.apache.hadoop.io.WritableComparable;
49  
50  import com.google.common.base.Preconditions;
51  
52  /**
53   * Additional abstract implementations for all pieces to be used.
54   * Code here is not in AbstractPiece only to allow for non-standard
55   * non-user-defined pieces. <br>
56   * Only logic used by the underlying framework directly is in AbstractPiece
57   * itself.
58   *
59   * @param <I> Vertex id type
60   * @param <V> Vertex value type
61   * @param <E> Edge value type
62   * @param <M> Message type
63   * @param <WV> Worker value type
64   * @param <WM> Worker message type
65   * @param <S> Execution stage type
66   */
67  @SuppressWarnings({ "rawtypes", "unchecked" })
68  public abstract class DefaultParentPiece<I extends WritableComparable,
69      V extends Writable, E extends Writable, M extends Writable, WV,
70      WM extends Writable, S> extends AbstractPiece<I, V, E, M, WV, WM, S> {
71    // TODO move to GiraphConstants
72    /**
73     * This option will tell which message encode &amp; store enum to force,
74     * when combining is not enabled.
75     *
76     * MESSAGE_ENCODE_AND_STORE_TYPE and this property are basically upper
77     * and lower bound on message store type, when looking them in order from
78     * not doing anything special, to most advanced type:
79     * BYTEARRAY_PER_PARTITION,
80     * EXTRACT_BYTEARRAY_PER_PARTITION,
81     * POINTER_LIST_PER_VERTEX
82     * resulting encode type is going to be:
83     * pieceEncodingType = piece.allowOneMessageToManyIdsEncoding() ?
84     *    POINTER_LIST_PER_VERTEX : BYTEARRAY_PER_PARTITION)
85     * Math.max(index(minForce), Math.min(index(maxAllowed), index(pieceType);
86     *
87     * This is useful to force all pieces onto particular message store, even
88     * if they do not overrideallowOneMessageToManyIdsEncoding, though that might
89     * be rarely needed.
90     * This option might be more useful for fully local computation,
91     * where overall job behavior is quite different.
92     */
93    public static final EnumConfOption<MessageEncodeAndStoreType>
94    MESSAGE_ENCODE_AND_STORE_TYPE_MIN_FORCE =
95        EnumConfOption.create("giraph.messageEncodeAndStoreTypeMinForce",
96            MessageEncodeAndStoreType.class,
97            MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION,
98            "Select the message_encode_and_store_type min force to use");
99  
100   private final ReduceUtilsObject reduceUtils = new ReduceUtilsObject();
101   private ReducersForPieceHandler reducersHandler;
102 
103   // Overridable functions
104 
105   /**
106    * Override to register any potential reducers used by this piece,
107    * through calls to {@code reduceApi}, which will return reducer handles
108    * for simple.
109    * Tip: Without defining a field, first write here name of the field and what
110    * you want to reduce, like:
111    * {@code totalSum = reduceApi.createLocalReducer(SumReduce.DOUBLE); }
112    * and then use tools your IDE provides to generate field signature itself,
113    * which might be slightly complex:
114    * {@code ReducerHandle<DoubleWritable, DoubleWritable> totalSum; }
115    */
116   public void registerReducers(CreateReducersApi reduceApi, S executionStage) {
117   }
118 
119   /**
120    * Override to do vertex send processing.
121    *
122    * Creates handler that defines what should be executed on worker
123    * during send phase.
124    *
125    * This logic gets executed first.
126    * This function is called once on each worker on each thread, in parallel,
127    * on their copy of Piece object to create functions handler.
128    *
129    * If returned object implements Postprocessor interface, then corresponding
130    * postprocess() function is going to be called once, after all vertices
131    * corresponding thread needed to process are done.
132    */
133   public VertexSender<I, V, E> getVertexSender(
134       BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
135     return null;
136   }
137 
138   /**
139    * Override to specify type of the message this Piece sends, if it does
140    * send messages.
141    *
142    * If not overwritten, no messages can be sent.
143    */
144   protected Class<M> getMessageClass() {
145     return null;
146   }
147 
148   /**
149    * Override to specify message value factory to be used,
150    * which creates objects into which messages will be deserialized.
151    *
152    * If not overwritten, or null is returned, DefaultMessageValueFactory
153    * will be used.
154    */
155   protected MessageValueFactory<M> getMessageFactory(
156       ImmutableClassesGiraphConfiguration conf) {
157     return null;
158   }
159 
160   /**
161    * Override to specify message combiner to be used, if any.
162    *
163    * Message combiner itself should be immutable
164    * (i.e. it will be call simultanously from multiple threads)
165    */
166   protected MessageCombiner<? super I, M> getMessageCombiner(
167       ImmutableClassesGiraphConfiguration conf) {
168     return null;
169   }
170 
171   /**
172    * Override to specify that this Piece allows one to many ids encoding to be
173    * used for messages.
174    * You should override this function, if you are sending identical message to
175    * all targets, and message itself is not extremely small.
176    */
177   protected boolean allowOneMessageToManyIdsEncoding() {
178     return false;
179   }
180 
181   @Override
182   public MessageClasses<I, M> getMessageClasses(
183       ImmutableClassesGiraphConfiguration conf) {
184     Class<M> messageClass = null;
185     MessageValueFactory<M> messageFactory = getMessageFactory(conf);
186     MessageCombiner<? super I, M> messageCombiner = getMessageCombiner(conf);
187 
188     if (messageFactory != null) {
189       messageClass = (Class) messageFactory.newInstance().getClass();
190     } else if (messageCombiner != null) {
191       messageClass = (Class) messageCombiner.createInitialMessage().getClass();
192     }
193 
194     if (messageClass != null) {
195       Preconditions.checkState(getMessageClass() == null,
196           "Piece %s defines getMessageFactory or getMessageCombiner, " +
197           "so it doesn't need to define getMessageClass.",
198           toString());
199     } else {
200       messageClass = getMessageClass();
201       if (messageClass == null) {
202         messageClass = (Class) NoMessage.class;
203       }
204     }
205 
206     SupplierFromConf<MessageValueFactory<M>> messageFactorySupplier;
207     if (messageFactory != null) {
208       messageFactorySupplier =
209           new SupplierFromConfByCopy<MessageValueFactory<M>>(messageFactory);
210     } else {
211       messageFactorySupplier =
212           new DefaultMessageFactorySupplierFromConf<>(messageClass);
213     }
214 
215     SupplierFromConf<? extends MessageCombiner<? super I, M>>
216     messageCombinerSupplier;
217     if (messageCombiner != null) {
218       messageCombinerSupplier = new SupplierFromConfByCopy<>(messageCombiner);
219     } else {
220       messageCombinerSupplier = null;
221     }
222 
223     int maxAllowed =
224         GiraphConstants.MESSAGE_ENCODE_AND_STORE_TYPE.get(conf).ordinal();
225     int minForce = MESSAGE_ENCODE_AND_STORE_TYPE_MIN_FORCE.get(conf).ordinal();
226     Preconditions.checkState(maxAllowed >= minForce);
227 
228     int pieceEncodeType = (allowOneMessageToManyIdsEncoding() ?
229         MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX :
230         MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION).ordinal();
231     // bound piece type with boundaries:
232     pieceEncodeType = Math.max(minForce, Math.min(maxAllowed, pieceEncodeType));
233 
234     MessageEncodeAndStoreType messageEncodeAndStoreType =
235         MessageEncodeAndStoreType.values()[pieceEncodeType];
236 
237     if (messageFactory instanceof GiraphConfigurationSettable) {
238       throw new IllegalStateException(
239           messageFactory.getClass() + " MessageFactory in " + this +
240           " Piece implements GiraphConfigurationSettable");
241     }
242     if (messageCombiner instanceof GiraphConfigurationSettable) {
243       throw new IllegalStateException(
244           messageCombiner.getClass() + " MessageCombiner in " + this +
245           " Piece implements GiraphConfigurationSettable");
246     }
247 
248     return new ObjectMessageClasses<>(
249         messageClass, messageFactorySupplier,
250         messageCombinerSupplier, messageEncodeAndStoreType);
251   }
252 
253   // Internal implementation
254 
255   @Override
256   public final InnerVertexSender getWrappedVertexSender(
257       final BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
258     reducersHandler.vertexSenderWorkerPreprocess(workerApi);
259     final VertexSender<I, V, E> functions =
260         getVertexSender(workerApi, executionStage);
261     return new InnerVertexSender() {
262       @Override
263       public void vertexSend(Vertex<I, V, E> vertex) {
264         if (functions != null) {
265           functions.vertexSend(vertex);
266         }
267       }
268       @Override
269       public void postprocess() {
270         if (functions instanceof VertexPostprocessor) {
271           ((VertexPostprocessor) functions).postprocess();
272         }
273         reducersHandler.vertexSenderWorkerPostprocess(workerApi);
274       }
275     };
276   }
277 
278   @Override
279   public final void wrappedRegisterReducers(
280       BlockMasterApi masterApi, S executionStage) {
281     reducersHandler = new ReducersForPieceHandler();
282     registerReducers(new CreateReducersApiWrapper(
283         masterApi, reducersHandler), executionStage);
284   }
285 
286   // utility functions:
287   // TODO Java8 - move these as default functions to VertexSender interface
288   protected final void reduceDouble(
289       ReducerHandle<DoubleWritable, ?> reduceHandle, double value) {
290     reduceUtils.reduceDouble(reduceHandle, value);
291   }
292 
293   protected final void reduceFloat(
294       ReducerHandle<FloatWritable, ?> reduceHandle, float value) {
295     reduceUtils.reduceFloat(reduceHandle, value);
296   }
297 
298   protected final void reduceLong(
299       ReducerHandle<LongWritable, ?> reduceHandle, long value) {
300     reduceUtils.reduceLong(reduceHandle, value);
301   }
302 
303   protected final void reduceInt(
304       ReducerHandle<IntWritable, ?> reduceHandle, int value) {
305     reduceUtils.reduceInt(reduceHandle, value);
306   }
307 }