This project has retired. For details please refer to its
        
        Attic page.
      
 
BasicArrayReduce xref
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.giraph.block_app.reducers.array;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.IOException;
23  
24  import org.apache.commons.lang3.tuple.MutablePair;
25  import org.apache.commons.lang3.tuple.Pair;
26  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
27  import org.apache.giraph.block_app.framework.api.CreateReducersApi;
28  import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi;
29  import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
30  import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
31  import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
32  import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle;
33  import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef;
34  import org.apache.giraph.master.MasterGlobalCommUsage;
35  import org.apache.giraph.reducers.ReduceOperation;
36  import org.apache.giraph.types.ops.PrimitiveTypeOps;
37  import org.apache.giraph.types.ops.TypeOpsUtils;
38  import org.apache.giraph.types.ops.collections.array.WArrayList;
39  import org.apache.giraph.utils.WritableUtils;
40  import org.apache.giraph.worker.WorkerBroadcastUsage;
41  import org.apache.hadoop.io.Writable;
42  
43  
44  
45  
46  
47  
48  
49  
50  
51  
52  public class BasicArrayReduce<S, R extends Writable>
53      implements ReduceOperation<Pair<IntRef, S>, WArrayList<R>> {
54    private int fixedSize;
55    private PrimitiveTypeOps<R> typeOps;
56    private ReduceOperation<S, R> elementReduceOp;
57    private R initialElement;
58    private R reusable;
59    private R reusable2;
60  
61    public BasicArrayReduce() {
62    }
63  
64  
65    
66  
67  
68  
69  
70  
71  
72  
73    public BasicArrayReduce(
74        int fixedSize,
75        PrimitiveTypeOps<R> typeOps,
76        ReduceOperation<S, R> elementReduceOp) {
77      this.fixedSize = fixedSize;
78      this.typeOps = typeOps;
79      this.elementReduceOp = elementReduceOp;
80      init();
81    }
82  
83  
84    
85  
86  
87  
88  
89  
90  
91    public BasicArrayReduce(
92        PrimitiveTypeOps<R> typeOps, ReduceOperation<S, R> elementReduceOp) {
93      this(-1, typeOps, elementReduceOp);
94    }
95  
96  
97    
98  
99  
100 
101 
102 
103 
104 
105 
106 
107 
108 
109 
110   public static <S, R extends Writable>
111   ReducerArrayHandle<S, R> createLocalArrayHandles(
112       PrimitiveTypeOps<R> typeOps, ReduceOperation<S, R> elementReduceOp,
113       CreateReducersApi reduceApi) {
114     return createLocalArrayHandles(-1, typeOps, elementReduceOp, reduceApi);
115   }
116 
117   
118 
119 
120 
121 
122 
123 
124 
125 
126 
127 
128 
129 
130 
131   public static <S, R extends Writable>
132   ReducerArrayHandle<S, R> createLocalArrayHandles(
133       int fixedSize, PrimitiveTypeOps<R> typeOps,
134       ReduceOperation<S, R> elementReduceOp,
135       final CreateReducersApi reduceApi) {
136     return createArrayHandles(fixedSize, typeOps, elementReduceOp,
137         new CreateReducerFunctionApi() {
138           @Override
139           public <S, R extends Writable> ReducerHandle<S, R> createReducer(
140               ReduceOperation<S, R> reduceOp) {
141             return reduceApi.createLocalReducer(reduceOp);
142           }
143         });
144   }
145 
146   
147 
148 
149 
150 
151 
152 
153 
154 
155 
156 
157 
158 
159   public static <S, R extends Writable>
160   ReducerArrayHandle<S, R> createArrayHandles(
161       PrimitiveTypeOps<R> typeOps, ReduceOperation<S, R> elementReduceOp,
162       CreateReducerFunctionApi createFunction) {
163     return createArrayHandles(-1, typeOps, elementReduceOp, createFunction);
164   }
165 
166   
167 
168 
169 
170 
171 
172 
173 
174 
175 
176 
177 
178 
179 
180   public static <S, R extends Writable>
181   ReducerArrayHandle<S, R> createArrayHandles(
182       final int fixedSize, final PrimitiveTypeOps<R> typeOps,
183       ReduceOperation<S, R> elementReduceOp,
184       CreateReducerFunctionApi createFunction) {
185     final ReducerHandle<Pair<IntRef, S>, WArrayList<R>> reduceHandle =
186         createFunction.createReducer(
187             new BasicArrayReduce<>(fixedSize, typeOps, elementReduceOp));
188     final IntRef curIndex = new IntRef(0);
189     final R reusableValue = typeOps.create();
190     final R initialValue = elementReduceOp.createInitialValue();
191     final MutablePair<IntRef, S> reusablePair =
192         MutablePair.of(new IntRef(0), null);
193     final ReducerHandle<S, R> elementReduceHandle = new ReducerHandle<S, R>() {
194       @Override
195       public R getReducedValue(MasterGlobalCommUsage master) {
196         WArrayList<R> result = reduceHandle.getReducedValue(master);
197         if (fixedSize == -1 && curIndex.value >= result.size()) {
198           typeOps.set(reusableValue, initialValue);
199         } else {
200           result.getIntoW(curIndex.value, reusableValue);
201         }
202         return reusableValue;
203       }
204 
205       @Override
206       public void reduce(S valueToReduce) {
207         reusablePair.getLeft().value = curIndex.value;
208         reusablePair.setRight(valueToReduce);
209         reduceHandle.reduce(reusablePair);
210       }
211 
212       @Override
213       public BroadcastHandle<R> broadcastValue(BlockMasterApi master) {
214         throw new UnsupportedOperationException();
215       }
216     };
217 
218     return new ReducerArrayHandle<S, R>() {
219       @Override
220       public ReducerHandle<S, R> get(int index) {
221         curIndex.value = index;
222         return elementReduceHandle;
223       }
224 
225       @Override
226       public int getStaticSize() {
227         if (fixedSize == -1) {
228           throw new UnsupportedOperationException(
229               "Cannot call size, when one is not specified upfront");
230         }
231         return fixedSize;
232       }
233 
234       @Override
235       public int getReducedSize(BlockMasterApi master) {
236         return reduceHandle.getReducedValue(master).size();
237       }
238 
239       @Override
240       public BroadcastArrayHandle<R> broadcastValue(BlockMasterApi master) {
241         final BroadcastHandle<WArrayList<R>> broadcastHandle =
242             reduceHandle.broadcastValue(master);
243         final IntRef curIndex = new IntRef(0);
244         final R reusableValue = typeOps.create();
245         final BroadcastHandle<R>
246         elementBroadcastHandle = new BroadcastHandle<R>() {
247           @Override
248           public R getBroadcast(WorkerBroadcastUsage worker) {
249             WArrayList<R> result = broadcastHandle.getBroadcast(worker);
250             if (fixedSize == -1 && curIndex.value >= result.size()) {
251               typeOps.set(reusableValue, initialValue);
252             } else {
253               result.getIntoW(curIndex.value, reusableValue);
254             }
255             return reusableValue;
256           }
257         };
258         return new BroadcastArrayHandle<R>() {
259           @Override
260           public BroadcastHandle<R> get(int index) {
261             curIndex.value = index;
262             return elementBroadcastHandle;
263           }
264 
265           @Override
266           public int getStaticSize() {
267             if (fixedSize == -1) {
268               throw new UnsupportedOperationException(
269                   "Cannot call size, when one is not specified upfront");
270             }
271             return fixedSize;
272           }
273 
274           @Override
275           public int getBroadcastedSize(WorkerBroadcastUsage worker) {
276             return broadcastHandle.getBroadcast(worker).size();
277           }
278         };
279       }
280     };
281   }
282 
283 
284   private void init() {
285     initialElement = elementReduceOp.createInitialValue();
286     reusable = typeOps.create();
287     reusable2 = typeOps.create();
288   }
289 
290   @Override
291   public WArrayList<R> createInitialValue() {
292     if (fixedSize != -1) {
293       WArrayList<R> list = typeOps.createArrayList(fixedSize);
294       fill(list, fixedSize);
295       return list;
296     } else {
297       return typeOps.createArrayList(1);
298     }
299   }
300 
301   private void fill(WArrayList<R> list, int newSize) {
302     if (fixedSize != -1 && newSize > fixedSize) {
303       throw new IllegalArgumentException(newSize + " larger then " + fixedSize);
304     }
305 
306     if (list.capacity() < newSize) {
307       list.setCapacity(newSize);
308     }
309     while (list.size() < newSize) {
310       list.addW(initialElement);
311     }
312   }
313 
314   @Override
315   public WArrayList<R> reduce(
316       WArrayList<R> curValue, Pair<IntRef, S> valueToReduce) {
317     int index = valueToReduce.getLeft().value;
318     fill(curValue, index + 1);
319     curValue.getIntoW(index, reusable);
320     R result = elementReduceOp.reduce(reusable, valueToReduce.getRight());
321     curValue.setW(index, result);
322     return curValue;
323   }
324 
325   @Override
326   public WArrayList<R> reduceMerge(
327       WArrayList<R> curValue, WArrayList<R> valueToReduce) {
328     fill(curValue, valueToReduce.size());
329     for (int i = 0; i < valueToReduce.size(); i++) {
330       valueToReduce.getIntoW(i, reusable2);
331       curValue.getIntoW(i, reusable);
332       R result = elementReduceOp.reduceMerge(reusable, reusable2);
333       curValue.setW(i, result);
334     }
335 
336     return curValue;
337   }
338 
339   @Override
340   public void write(DataOutput out) throws IOException {
341     out.writeInt(fixedSize);
342     TypeOpsUtils.writeTypeOps(typeOps, out);
343     WritableUtils.writeWritableObject(elementReduceOp, out);
344   }
345 
346   @Override
347   public void readFields(DataInput in) throws IOException {
348     fixedSize = in.readInt();
349     typeOps = TypeOpsUtils.readTypeOps(in);
350     elementReduceOp = WritableUtils.readWritableObject(in, null);
351     init();
352   }
353 }