This project has retired. For details please refer to its
        
        Attic page.
      
 
HugeArrayUtils xref
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.giraph.block_app.reducers.array;
19  
20  import java.util.ArrayList;
21  
22  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
23  import org.apache.giraph.block_app.framework.api.CreateReducersApi;
24  import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi;
25  import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
26  import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
27  import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
28  import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle;
29  import org.apache.giraph.block_app.reducers.array.ArrayOfHandles.ArrayOfBroadcasts;
30  import org.apache.giraph.block_app.reducers.array.ArrayOfHandles.ArrayOfReducers;
31  import org.apache.giraph.conf.IntConfOption;
32  import org.apache.giraph.function.ObjectHolder;
33  import org.apache.giraph.function.Supplier;
34  import org.apache.giraph.function.primitive.Int2ObjFunction;
35  import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef;
36  import org.apache.giraph.reducers.ReduceOperation;
37  import org.apache.giraph.types.ops.PrimitiveTypeOps;
38  import org.apache.giraph.types.ops.TypeOpsUtils;
39  import org.apache.giraph.types.ops.collections.array.WArrayList;
40  import org.apache.giraph.utils.ArrayWritable;
41  import org.apache.giraph.worker.WorkerBroadcastUsage;
42  import org.apache.hadoop.io.Writable;
43  
44  
45  
46  
47  
48  
49  
50  
51  
52  
53  
54  
55  
56  
57  
58  
59  
60  
61  
62  
63  
64  
65  
66  public class HugeArrayUtils {
67    
68    
69    private static final IntConfOption NUM_STRIPES = new IntConfOption(
70        "giraph.reducers.HugeArrayUtils.num_stripes", 500000,
71        "Number of distict reducers to create. If array is smaller then this" +
72        "number, each element will be it's own reducer");
73  
74    private HugeArrayUtils() { }
75  
76    
77  
78  
79  
80  
81  
82  
83  
84  
85    public static <S, R extends Writable>
86    ReducerArrayHandle<S, R> createGlobalReducerArrayHandle(
87        final int fixedSize, final ReduceOperation<S, R> elementReduceOp,
88        final CreateReducersApi reduceApi) {
89      return createGlobalReducerArrayHandle(
90          fixedSize, elementReduceOp, reduceApi,
91          NUM_STRIPES.get(reduceApi.getConf()));
92    }
93  
94    
95  
96  
97  
98  
99  
100 
101 
102 
103 
104   public static <S, R extends Writable>
105   ReducerArrayHandle<S, R> createGlobalReducerArrayHandle(
106       final int fixedSize, final ReduceOperation<S, R> elementReduceOp,
107       final CreateReducersApi reduceApi, int maxNumStripes) {
108     PrimitiveTypeOps<R> typeOps = TypeOpsUtils.getPrimitiveTypeOpsOrNull(
109         (Class<R>) elementReduceOp.createInitialValue().getClass());
110 
111     final CreateReducerFunctionApi
112     createReducer = new CreateReducerFunctionApi() {
113       @Override
114       public <S, R extends Writable> ReducerHandle<S, R> createReducer(
115           ReduceOperation<S, R> reduceOp) {
116         return reduceApi.createGlobalReducer(reduceOp);
117       }
118     };
119 
120     if (fixedSize < maxNumStripes) {
121       return new ArrayOfReducers<>(
122           fixedSize,
123           new Supplier<ReducerHandle<S, R>>() {
124             @Override
125             public ReducerHandle<S, R> get() {
126               return createReducer.createReducer(elementReduceOp);
127             }
128           });
129     } else {
130       final ObjectStriping striping =
131           new ObjectStriping(fixedSize, maxNumStripes);
132 
133       final ArrayList<ReducerArrayHandle<S, R>> handles =
134           new ArrayList<>(striping.getSplits());
135       for (int i = 0; i < striping.getSplits(); i++) {
136         if (typeOps != null) {
137           handles.add(BasicArrayReduce.createArrayHandles(
138               striping.getSplitSize(i), typeOps,
139               elementReduceOp, createReducer));
140         } else {
141           handles.add(ArrayReduce.createArrayHandles(
142               striping.getSplitSize(i), elementReduceOp, createReducer));
143         }
144       }
145 
146       return new ReducerArrayHandle<S, R>() {
147         @Override
148         public ReducerHandle<S, R> get(int index) {
149           if ((index >= fixedSize) || (index < 0)) {
150             throw new RuntimeException(
151                 "Reducer Access out of bounds: requested : " +
152                     index + " from array of size : " + fixedSize);
153           }
154           int reducerIndex = striping.getSplitIndex(index);
155           int insideIndex = striping.getInsideIndex(index);
156           return handles.get(reducerIndex).get(insideIndex);
157         }
158 
159         @Override
160         public int getStaticSize() {
161           return fixedSize;
162         }
163 
164         @Override
165         public int getReducedSize(BlockMasterApi master) {
166           return getStaticSize();
167         }
168 
169         @Override
170         public BroadcastArrayHandle<R> broadcastValue(BlockMasterApi master) {
171           throw new UnsupportedOperationException("for now not supported");
172         }
173       };
174     }
175   }
176 
177   
178 
179 
180 
181 
182 
183 
184 
185   public static <V extends Writable> BroadcastArrayHandle<V> broadcast(
186       final int count,
187       final Int2ObjFunction<V> valueSupplier,
188       final BlockMasterApi master) {
189     return broadcast(count, valueSupplier, null, master);
190   }
191 
192   
193 
194 
195 
196 
197 
198 
199 
200 
201 
202   public static <V extends Writable> BroadcastArrayHandle<V> broadcast(
203       final int count,
204       final Int2ObjFunction<V> valueSupplier,
205       final PrimitiveTypeOps<V> typeOps,
206       final BlockMasterApi master) {
207     int numStripes = NUM_STRIPES.get(master.getConf());
208     if (count < numStripes) {
209       return new ArrayOfBroadcasts<>(
210           count,
211           new Int2ObjFunction<BroadcastHandle<V>>() {
212             @Override
213             public BroadcastHandle<V> apply(int i) {
214               
215               
216               
217               return master.broadcast(
218                 typeOps != null ?
219                 typeOps.createCopy(valueSupplier.apply(i)) :
220                 valueSupplier.apply(i));
221             }
222           });
223     } else {
224       ObjectStriping striping = new ObjectStriping(count, numStripes);
225       final Int2ObjFunction<BroadcastHandle<V>> handleSupplier;
226 
227       if (typeOps != null) {
228         handleSupplier = getPrimitiveBroadcastHandleSupplier(
229             valueSupplier, typeOps, master, striping);
230       } else {
231         handleSupplier = getObjectBroadcastHandleSupplier(
232             valueSupplier, master, striping);
233       }
234       return new BroadcastArrayHandle<V>() {
235         @Override
236         public BroadcastHandle<V> get(int index) {
237           if (index >= count || index < 0) {
238             throw new RuntimeException(
239                 "Broadcast Access out of bounds: requested: " +
240                   index + " from array of size : " + count);
241           }
242           return handleSupplier.apply(index);
243         }
244 
245         @Override
246         public int getBroadcastedSize(WorkerBroadcastUsage worker) {
247           return count;
248         }
249 
250         @Override
251         public int getStaticSize() {
252           return count;
253         }
254       };
255     }
256   }
257 
258   private static <V extends Writable>
259   Int2ObjFunction<BroadcastHandle<V>> getObjectBroadcastHandleSupplier(
260       final Int2ObjFunction<V> valueSupplier,
261       final BlockMasterApi master, final ObjectStriping striping) {
262     final ObjectHolder<Class<V>> elementClass = new ObjectHolder<>();
263     final ArrayOfHandles<BroadcastHandle<ArrayWritable<V>>> arrayOfBroadcasts =
264       new ArrayOfHandles<>(
265         striping.getSplits(),
266         new Int2ObjFunction<BroadcastHandle<ArrayWritable<V>>>() {
267           @Override
268           public BroadcastHandle<ArrayWritable<V>> apply(int value) {
269             int size = striping.getSplitSize(value);
270             int start = striping.getSplitStart(value);
271             V[] array = (V[]) new Writable[size];
272             for (int i = 0; i < size; i++) {
273               array[i] = valueSupplier.apply(start + i);
274               if (elementClass.get() == null) {
275                 elementClass.apply((Class<V>) array[i].getClass());
276               }
277             }
278             return master.broadcast(
279                 new ArrayWritable<>(elementClass.get(), array));
280           }
281         });
282 
283     final IntRef insideIndex = new IntRef(-1);
284     final ObjectHolder<BroadcastHandle<ArrayWritable<V>>> handleHolder =
285         new ObjectHolder<>();
286 
287     final BroadcastHandle<V> reusableHandle = new BroadcastHandle<V>() {
288       @Override
289       public V getBroadcast(WorkerBroadcastUsage worker) {
290         return handleHolder.get().getBroadcast(worker).get()[insideIndex.value];
291       }
292     };
293 
294     return createBroadcastHandleSupplier(
295         striping, arrayOfBroadcasts, insideIndex, handleHolder,
296         reusableHandle);
297   }
298 
299   private static <V extends Writable>
300   Int2ObjFunction<BroadcastHandle<V>> getPrimitiveBroadcastHandleSupplier(
301       final Int2ObjFunction<V> valueSupplier, final PrimitiveTypeOps<V> typeOps,
302       final BlockMasterApi master, final ObjectStriping striping) {
303     final ArrayOfHandles<BroadcastHandle<WArrayList<V>>> arrayOfBroadcasts =
304       new ArrayOfHandles<>(
305         striping.getSplits(),
306         new Int2ObjFunction<BroadcastHandle<WArrayList<V>>>() {
307           @Override
308           public BroadcastHandle<WArrayList<V>> apply(int value) {
309             int size = striping.getSplitSize(value);
310             int start = striping.getSplitStart(value);
311             WArrayList<V> array = typeOps.createArrayList(size);
312             for (int i = 0; i < size; i++) {
313               array.addW(valueSupplier.apply(start + i));
314             }
315             return master.broadcast(array);
316           }
317         });
318 
319     final IntRef insideIndex = new IntRef(-1);
320     final ObjectHolder<BroadcastHandle<WArrayList<V>>> handleHolder =
321             new ObjectHolder<>();
322     final BroadcastHandle<V> reusableHandle = new BroadcastHandle<V>() {
323       private final V reusable = typeOps.create();
324       @Override
325       public V getBroadcast(WorkerBroadcastUsage worker) {
326         handleHolder.get().getBroadcast(worker).getIntoW(
327             insideIndex.value, reusable);
328         return reusable;
329       }
330     };
331 
332     return createBroadcastHandleSupplier(
333         striping, arrayOfBroadcasts, insideIndex, handleHolder,
334         reusableHandle);
335   }
336 
337   private static <V extends Writable, A>
338   Int2ObjFunction<BroadcastHandle<V>> createBroadcastHandleSupplier(
339       final ObjectStriping striping,
340       final ArrayOfHandles<BroadcastHandle<A>> arrayOfBroadcasts,
341       final IntRef insideIndex,
342       final ObjectHolder<BroadcastHandle<A>> handleHolder,
343       final BroadcastHandle<V> reusableHandle) {
344     final Int2ObjFunction<BroadcastHandle<V>> handleProvider =
345         new Int2ObjFunction<BroadcastHandle<V>>() {
346       @Override
347       public BroadcastHandle<V> apply(int index) {
348         int broadcastIndex = striping.getSplitIndex(index);
349         insideIndex.value = striping.getInsideIndex(index);
350         handleHolder.apply(arrayOfBroadcasts.get(broadcastIndex));
351         return reusableHandle;
352       }
353     };
354     return handleProvider;
355   }
356 
357   
358 
359 
360 
361   static class ObjectStriping {
362     private final int splits;
363     private final int indicesPerObject;
364     private final int overflowNum;
365     private final int beforeOverflow;
366 
367     public ObjectStriping(int size, int splits) {
368       this.splits = splits;
369       this.indicesPerObject = size / splits;
370       this.overflowNum = size % splits;
371       this.beforeOverflow = overflowNum * (indicesPerObject + 1);
372     }
373 
374     public int getSplits() {
375       return splits;
376     }
377 
378     public int getSplitSize(int splitIndex) {
379       return indicesPerObject + (splitIndex < overflowNum ? 1 : 0);
380     }
381 
382     public int getSplitStart(int splitIndex) {
383       if (splitIndex < overflowNum) {
384         return splitIndex * (indicesPerObject + 1);
385       } else {
386         return beforeOverflow + (splitIndex - overflowNum) * indicesPerObject;
387       }
388     }
389 
390     public int getSplitIndex(int objectIndex) {
391       if (objectIndex < beforeOverflow) {
392         return objectIndex / (indicesPerObject + 1);
393       } else {
394         return (objectIndex - beforeOverflow) / indicesPerObject + overflowNum;
395       }
396     }
397 
398     public int getInsideIndex(int objectIndex) {
399       if (objectIndex < beforeOverflow) {
400         return objectIndex % (indicesPerObject + 1);
401       } else {
402         return (objectIndex - beforeOverflow) % indicesPerObject;
403       }
404     }
405   }
406 }