This project has retired. For details please refer to its Attic page.
HugeArrayUtils xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.reducers.array;
19  
20  import java.util.ArrayList;
21  
22  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
23  import org.apache.giraph.block_app.framework.api.CreateReducersApi;
24  import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi;
25  import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
26  import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
27  import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
28  import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle;
29  import org.apache.giraph.block_app.reducers.array.ArrayOfHandles.ArrayOfBroadcasts;
30  import org.apache.giraph.block_app.reducers.array.ArrayOfHandles.ArrayOfReducers;
31  import org.apache.giraph.conf.IntConfOption;
32  import org.apache.giraph.function.ObjectHolder;
33  import org.apache.giraph.function.Supplier;
34  import org.apache.giraph.function.primitive.Int2ObjFunction;
35  import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef;
36  import org.apache.giraph.reducers.ReduceOperation;
37  import org.apache.giraph.types.ops.PrimitiveTypeOps;
38  import org.apache.giraph.types.ops.TypeOpsUtils;
39  import org.apache.giraph.types.ops.collections.array.WArrayList;
40  import org.apache.giraph.utils.ArrayWritable;
41  import org.apache.giraph.worker.WorkerBroadcastUsage;
42  import org.apache.hadoop.io.Writable;
43  
44  /**
45   * Utility class when we are dealing with huge arrays (i.e. large number of
46   * elements) within reducing/broadcasting.
47   *
48   * In Giraph, for each reducer there is a worker machine which is it's owner,
49   * which does partial aggregation for it. So if we have only single huge
50   * reducer - other workers will have to wait, while that single worker is doing
51   * huge reducing operation. Additionally single reducer should be smaller then
52   * max netty message, which is 1MB.
53   * On the other hand, each reducer has a meaningful overhead, so we should try
54   * to keep number of reducers as low as possible.
55   *
56   * By default we are being conservative, to keep individual reducers small,
57   * with striping into 500k reducers by default. If you know exact sizes of
58   * your objects you can specify exact number you want.
59   *
60   * So when we have huge array, we don't want one reducer/broadcast for each
61   * element, but we also don't want one reducer/broadcast for the whole array.
62   *
63   * This class allows transparent split into reasonable number of reducers
64   * (~500k), which solves both of the above issues.
65   */
66  public class HugeArrayUtils {
67    // Even with 100GB object, average stripe will be 200KB on average,
68    // keeping outliers mostly under 1MB limit
69    private static final IntConfOption NUM_STRIPES = new IntConfOption(
70        "giraph.reducers.HugeArrayUtils.num_stripes", 500000,
71        "Number of distict reducers to create. If array is smaller then this" +
72        "number, each element will be it's own reducer");
73  
74    private HugeArrayUtils() { }
75  
76    /**
77     * Create global array of reducers, by splitting the huge array
78     * into NUM_STRIPES number of parts.
79     *
80     * @param fixedSize Number of elements
81     * @param elementReduceOp ReduceOperation for individual elements
82     * @param reduceApi Api for creating reducers
83     * @return Created ReducerArrayHandle
84     */
85    public static <S, R extends Writable>
86    ReducerArrayHandle<S, R> createGlobalReducerArrayHandle(
87        final int fixedSize, final ReduceOperation<S, R> elementReduceOp,
88        final CreateReducersApi reduceApi) {
89      return createGlobalReducerArrayHandle(
90          fixedSize, elementReduceOp, reduceApi,
91          NUM_STRIPES.get(reduceApi.getConf()));
92    }
93  
94    /**
95     * Create global array of reducers, by splitting the huge array
96     * into {@code maxNumStripes} number of parts.
97     *
98     * @param fixedSize Number of elements
99     * @param elementReduceOp ReduceOperation for individual elements
100    * @param reduceApi Api for creating reducers
101    * @param maxNumStripes Maximal number of reducers to create.
102    * @return Created ReducerArrayHandle
103    */
104   public static <S, R extends Writable>
105   ReducerArrayHandle<S, R> createGlobalReducerArrayHandle(
106       final int fixedSize, final ReduceOperation<S, R> elementReduceOp,
107       final CreateReducersApi reduceApi, int maxNumStripes) {
108     PrimitiveTypeOps<R> typeOps = TypeOpsUtils.getPrimitiveTypeOpsOrNull(
109         (Class<R>) elementReduceOp.createInitialValue().getClass());
110 
111     final CreateReducerFunctionApi
112     createReducer = new CreateReducerFunctionApi() {
113       @Override
114       public <S, R extends Writable> ReducerHandle<S, R> createReducer(
115           ReduceOperation<S, R> reduceOp) {
116         return reduceApi.createGlobalReducer(reduceOp);
117       }
118     };
119 
120     if (fixedSize < maxNumStripes) {
121       return new ArrayOfReducers<>(
122           fixedSize,
123           new Supplier<ReducerHandle<S, R>>() {
124             @Override
125             public ReducerHandle<S, R> get() {
126               return createReducer.createReducer(elementReduceOp);
127             }
128           });
129     } else {
130       final ObjectStriping striping =
131           new ObjectStriping(fixedSize, maxNumStripes);
132 
133       final ArrayList<ReducerArrayHandle<S, R>> handles =
134           new ArrayList<>(striping.getSplits());
135       for (int i = 0; i < striping.getSplits(); i++) {
136         if (typeOps != null) {
137           handles.add(BasicArrayReduce.createArrayHandles(
138               striping.getSplitSize(i), typeOps,
139               elementReduceOp, createReducer));
140         } else {
141           handles.add(ArrayReduce.createArrayHandles(
142               striping.getSplitSize(i), elementReduceOp, createReducer));
143         }
144       }
145 
146       return new ReducerArrayHandle<S, R>() {
147         @Override
148         public ReducerHandle<S, R> get(int index) {
149           if ((index >= fixedSize) || (index < 0)) {
150             throw new RuntimeException(
151                 "Reducer Access out of bounds: requested : " +
152                     index + " from array of size : " + fixedSize);
153           }
154           int reducerIndex = striping.getSplitIndex(index);
155           int insideIndex = striping.getInsideIndex(index);
156           return handles.get(reducerIndex).get(insideIndex);
157         }
158 
159         @Override
160         public int getStaticSize() {
161           return fixedSize;
162         }
163 
164         @Override
165         public int getReducedSize(BlockMasterApi master) {
166           return getStaticSize();
167         }
168 
169         @Override
170         public BroadcastArrayHandle<R> broadcastValue(BlockMasterApi master) {
171           throw new UnsupportedOperationException("for now not supported");
172         }
173       };
174     }
175   }
176 
177   /**
178    * Broadcast a huge array, by splitting into NUM_STRIPES number of parts.
179    *
180    * @param count Number of elements
181    * @param valueSupplier Supplier of value to be broadcasted for a given index
182    * @param master Master API
183    * @return Created BroadcastArrayHandle
184    */
185   public static <V extends Writable> BroadcastArrayHandle<V> broadcast(
186       final int count,
187       final Int2ObjFunction<V> valueSupplier,
188       final BlockMasterApi master) {
189     return broadcast(count, valueSupplier, null, master);
190   }
191 
192   /**
193    * Broadcast a huge array, by splitting into NUM_STRIPES number of parts.
194    * Efficient for primitive types, using BasicArray underneath.
195    *
196    * @param count Number of elements
197    * @param valueSupplier Supplier of value to be broadcasted for a given index
198    * @param typeOps Element TypeOps
199    * @param master Master API
200    * @return Created BroadcastArrayHandle
201    */
202   public static <V extends Writable> BroadcastArrayHandle<V> broadcast(
203       final int count,
204       final Int2ObjFunction<V> valueSupplier,
205       final PrimitiveTypeOps<V> typeOps,
206       final BlockMasterApi master) {
207     int numStripes = NUM_STRIPES.get(master.getConf());
208     if (count < numStripes) {
209       return new ArrayOfBroadcasts<>(
210           count,
211           new Int2ObjFunction<BroadcastHandle<V>>() {
212             @Override
213             public BroadcastHandle<V> apply(int i) {
214               // We create a copy because the valueSupplier might return a
215               // reusable obj. This function is NOT safe if typeOps is null
216               // & valueSupplier returns reusable
217               return master.broadcast(
218                 typeOps != null ?
219                 typeOps.createCopy(valueSupplier.apply(i)) :
220                 valueSupplier.apply(i));
221             }
222           });
223     } else {
224       ObjectStriping striping = new ObjectStriping(count, numStripes);
225       final Int2ObjFunction<BroadcastHandle<V>> handleSupplier;
226 
227       if (typeOps != null) {
228         handleSupplier = getPrimitiveBroadcastHandleSupplier(
229             valueSupplier, typeOps, master, striping);
230       } else {
231         handleSupplier = getObjectBroadcastHandleSupplier(
232             valueSupplier, master, striping);
233       }
234       return new BroadcastArrayHandle<V>() {
235         @Override
236         public BroadcastHandle<V> get(int index) {
237           if (index >= count || index < 0) {
238             throw new RuntimeException(
239                 "Broadcast Access out of bounds: requested: " +
240                   index + " from array of size : " + count);
241           }
242           return handleSupplier.apply(index);
243         }
244 
245         @Override
246         public int getBroadcastedSize(WorkerBroadcastUsage worker) {
247           return count;
248         }
249 
250         @Override
251         public int getStaticSize() {
252           return count;
253         }
254       };
255     }
256   }
257 
258   private static <V extends Writable>
259   Int2ObjFunction<BroadcastHandle<V>> getObjectBroadcastHandleSupplier(
260       final Int2ObjFunction<V> valueSupplier,
261       final BlockMasterApi master, final ObjectStriping striping) {
262     final ObjectHolder<Class<V>> elementClass = new ObjectHolder<>();
263     final ArrayOfHandles<BroadcastHandle<ArrayWritable<V>>> arrayOfBroadcasts =
264       new ArrayOfHandles<>(
265         striping.getSplits(),
266         new Int2ObjFunction<BroadcastHandle<ArrayWritable<V>>>() {
267           @Override
268           public BroadcastHandle<ArrayWritable<V>> apply(int value) {
269             int size = striping.getSplitSize(value);
270             int start = striping.getSplitStart(value);
271             V[] array = (V[]) new Writable[size];
272             for (int i = 0; i < size; i++) {
273               array[i] = valueSupplier.apply(start + i);
274               if (elementClass.get() == null) {
275                 elementClass.apply((Class<V>) array[i].getClass());
276               }
277             }
278             return master.broadcast(
279                 new ArrayWritable<>(elementClass.get(), array));
280           }
281         });
282 
283     final IntRef insideIndex = new IntRef(-1);
284     final ObjectHolder<BroadcastHandle<ArrayWritable<V>>> handleHolder =
285         new ObjectHolder<>();
286 
287     final BroadcastHandle<V> reusableHandle = new BroadcastHandle<V>() {
288       @Override
289       public V getBroadcast(WorkerBroadcastUsage worker) {
290         return handleHolder.get().getBroadcast(worker).get()[insideIndex.value];
291       }
292     };
293 
294     return createBroadcastHandleSupplier(
295         striping, arrayOfBroadcasts, insideIndex, handleHolder,
296         reusableHandle);
297   }
298 
299   private static <V extends Writable>
300   Int2ObjFunction<BroadcastHandle<V>> getPrimitiveBroadcastHandleSupplier(
301       final Int2ObjFunction<V> valueSupplier, final PrimitiveTypeOps<V> typeOps,
302       final BlockMasterApi master, final ObjectStriping striping) {
303     final ArrayOfHandles<BroadcastHandle<WArrayList<V>>> arrayOfBroadcasts =
304       new ArrayOfHandles<>(
305         striping.getSplits(),
306         new Int2ObjFunction<BroadcastHandle<WArrayList<V>>>() {
307           @Override
308           public BroadcastHandle<WArrayList<V>> apply(int value) {
309             int size = striping.getSplitSize(value);
310             int start = striping.getSplitStart(value);
311             WArrayList<V> array = typeOps.createArrayList(size);
312             for (int i = 0; i < size; i++) {
313               array.addW(valueSupplier.apply(start + i));
314             }
315             return master.broadcast(array);
316           }
317         });
318 
319     final IntRef insideIndex = new IntRef(-1);
320     final ObjectHolder<BroadcastHandle<WArrayList<V>>> handleHolder =
321             new ObjectHolder<>();
322     final BroadcastHandle<V> reusableHandle = new BroadcastHandle<V>() {
323       private final V reusable = typeOps.create();
324       @Override
325       public V getBroadcast(WorkerBroadcastUsage worker) {
326         handleHolder.get().getBroadcast(worker).getIntoW(
327             insideIndex.value, reusable);
328         return reusable;
329       }
330     };
331 
332     return createBroadcastHandleSupplier(
333         striping, arrayOfBroadcasts, insideIndex, handleHolder,
334         reusableHandle);
335   }
336 
337   private static <V extends Writable, A>
338   Int2ObjFunction<BroadcastHandle<V>> createBroadcastHandleSupplier(
339       final ObjectStriping striping,
340       final ArrayOfHandles<BroadcastHandle<A>> arrayOfBroadcasts,
341       final IntRef insideIndex,
342       final ObjectHolder<BroadcastHandle<A>> handleHolder,
343       final BroadcastHandle<V> reusableHandle) {
344     final Int2ObjFunction<BroadcastHandle<V>> handleProvider =
345         new Int2ObjFunction<BroadcastHandle<V>>() {
346       @Override
347       public BroadcastHandle<V> apply(int index) {
348         int broadcastIndex = striping.getSplitIndex(index);
349         insideIndex.value = striping.getInsideIndex(index);
350         handleHolder.apply(arrayOfBroadcasts.get(broadcastIndex));
351         return reusableHandle;
352       }
353     };
354     return handleProvider;
355   }
356 
357   /**
358    * Handles indices calculations when spliting one range into smaller number
359    * of splits, where indices stay consecutive.
360    */
361   static class ObjectStriping {
362     private final int splits;
363     private final int indicesPerObject;
364     private final int overflowNum;
365     private final int beforeOverflow;
366 
367     public ObjectStriping(int size, int splits) {
368       this.splits = splits;
369       this.indicesPerObject = size / splits;
370       this.overflowNum = size % splits;
371       this.beforeOverflow = overflowNum * (indicesPerObject + 1);
372     }
373 
374     public int getSplits() {
375       return splits;
376     }
377 
378     public int getSplitSize(int splitIndex) {
379       return indicesPerObject + (splitIndex < overflowNum ? 1 : 0);
380     }
381 
382     public int getSplitStart(int splitIndex) {
383       if (splitIndex < overflowNum) {
384         return splitIndex * (indicesPerObject + 1);
385       } else {
386         return beforeOverflow + (splitIndex - overflowNum) * indicesPerObject;
387       }
388     }
389 
390     public int getSplitIndex(int objectIndex) {
391       if (objectIndex < beforeOverflow) {
392         return objectIndex / (indicesPerObject + 1);
393       } else {
394         return (objectIndex - beforeOverflow) / indicesPerObject + overflowNum;
395       }
396     }
397 
398     public int getInsideIndex(int objectIndex) {
399       if (objectIndex < beforeOverflow) {
400         return objectIndex % (indicesPerObject + 1);
401       } else {
402         return (objectIndex - beforeOverflow) % indicesPerObject;
403       }
404     }
405   }
406 }