This project has retired. For details please refer to its Attic page.
BasicArrayReduce xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.reducers.array;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.IOException;
23  
24  import org.apache.commons.lang3.tuple.MutablePair;
25  import org.apache.commons.lang3.tuple.Pair;
26  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
27  import org.apache.giraph.block_app.framework.api.CreateReducersApi;
28  import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi;
29  import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
30  import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
31  import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
32  import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle;
33  import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef;
34  import org.apache.giraph.master.MasterGlobalCommUsage;
35  import org.apache.giraph.reducers.ReduceOperation;
36  import org.apache.giraph.types.ops.PrimitiveTypeOps;
37  import org.apache.giraph.types.ops.TypeOpsUtils;
38  import org.apache.giraph.types.ops.collections.array.WArrayList;
39  import org.apache.giraph.utils.WritableUtils;
40  import org.apache.giraph.worker.WorkerBroadcastUsage;
41  import org.apache.hadoop.io.Writable;
42  
43  /**
44   * Efficient generic primitive array reduce operation.
45   *
46   * Allows two modes - fixed size, and infinite size
47   * (with keeping only actually used elements and resizing)
48   *
49   * @param <S> Single value type
50   * @param <R> Reduced value type
51   */
52  public class BasicArrayReduce<S, R extends Writable>
53      implements ReduceOperation<Pair<IntRef, S>, WArrayList<R>> {
54    private int fixedSize;
55    private PrimitiveTypeOps<R> typeOps;
56    private ReduceOperation<S, R> elementReduceOp;
57    private R initialElement;
58    private R reusable;
59    private R reusable2;
60  
61    public BasicArrayReduce() {
62    }
63  
64  
65    /**
66     * Create ReduceOperation that reduces BasicArrays by reducing individual
67     * elements, with predefined size.
68     *
69     * @param fixedSize Number of elements
70     * @param typeOps TypeOps of individual elements
71     * @param elementReduceOp ReduceOperation for individual elements
72     */
73    public BasicArrayReduce(
74        int fixedSize,
75        PrimitiveTypeOps<R> typeOps,
76        ReduceOperation<S, R> elementReduceOp) {
77      this.fixedSize = fixedSize;
78      this.typeOps = typeOps;
79      this.elementReduceOp = elementReduceOp;
80      init();
81    }
82  
83  
84    /**
85     * Create ReduceOperation that reduces BasicArrays by reducing individual
86     * elements, with unbounded size.
87     *
88     * @param typeOps TypeOps of individual elements
89     * @param elementReduceOp ReduceOperation for individual elements
90     */
91    public BasicArrayReduce(
92        PrimitiveTypeOps<R> typeOps, ReduceOperation<S, R> elementReduceOp) {
93      this(-1, typeOps, elementReduceOp);
94    }
95  
96  
97    /**
98     * Registers one new local reducer, that will reduce BasicArray,
99     * by reducing individual elements using {@code elementReduceOp},
100    * with unbounded size.
101    *
102    * This function will return ReducerArrayHandle, by which
103    * individual elements can be manipulated separately.
104    *
105    * @param typeOps TypeOps of individual elements
106    * @param elementReduceOp ReduceOperation for individual elements
107    * @param reduceApi API for creating reducers
108    * @return Created ReducerArrayHandle
109    */
110   public static <S, R extends Writable>
111   ReducerArrayHandle<S, R> createLocalArrayHandles(
112       PrimitiveTypeOps<R> typeOps, ReduceOperation<S, R> elementReduceOp,
113       CreateReducersApi reduceApi) {
114     return createLocalArrayHandles(-1, typeOps, elementReduceOp, reduceApi);
115   }
116 
117   /**
118    * Registers one new local reducer, that will reduce BasicArray,
119    * by reducing individual elements using {@code elementReduceOp},
120    * with predefined size.
121    *
122    * This function will return ReducerArrayHandle, by which
123    * individual elements can be manipulated separately.
124    *
125    * @param fixedSize Number of elements
126    * @param typeOps TypeOps of individual elements
127    * @param elementReduceOp ReduceOperation for individual elements
128    * @param reduceApi API for creating reducers
129    * @return Created ReducerArrayHandle
130    */
131   public static <S, R extends Writable>
132   ReducerArrayHandle<S, R> createLocalArrayHandles(
133       int fixedSize, PrimitiveTypeOps<R> typeOps,
134       ReduceOperation<S, R> elementReduceOp,
135       final CreateReducersApi reduceApi) {
136     return createArrayHandles(fixedSize, typeOps, elementReduceOp,
137         new CreateReducerFunctionApi() {
138           @Override
139           public <S, R extends Writable> ReducerHandle<S, R> createReducer(
140               ReduceOperation<S, R> reduceOp) {
141             return reduceApi.createLocalReducer(reduceOp);
142           }
143         });
144   }
145 
146   /**
147    * Registers one new reducer, that will reduce BasicArray,
148    * by reducing individual elements using {@code elementReduceOp},
149    * with unbounded size.
150    *
151    * This function will return ReducerArrayHandle, by which
152    * individual elements can be manipulated separately.
153    *
154    * @param typeOps TypeOps of individual elements
155    * @param elementReduceOp ReduceOperation for individual elements
156    * @param createFunction Function for creating a reducer
157    * @return Created ReducerArrayHandle
158    */
159   public static <S, R extends Writable>
160   ReducerArrayHandle<S, R> createArrayHandles(
161       PrimitiveTypeOps<R> typeOps, ReduceOperation<S, R> elementReduceOp,
162       CreateReducerFunctionApi createFunction) {
163     return createArrayHandles(-1, typeOps, elementReduceOp, createFunction);
164   }
165 
166   /**
167    * Registers one new reducer, that will reduce BasicArray,
168    * by reducing individual elements using {@code elementReduceOp},
169    * with predefined size.
170    *
171    * This function will return ReducerArrayHandle, by which
172    * individual elements can be manipulated separately.
173    *
174    * @param fixedSize Number of elements
175    * @param typeOps TypeOps of individual elements
176    * @param elementReduceOp ReduceOperation for individual elements
177    * @param createFunction Function for creating a reducer
178    * @return Created ReducerArrayHandle
179    */
180   public static <S, R extends Writable>
181   ReducerArrayHandle<S, R> createArrayHandles(
182       final int fixedSize, final PrimitiveTypeOps<R> typeOps,
183       ReduceOperation<S, R> elementReduceOp,
184       CreateReducerFunctionApi createFunction) {
185     final ReducerHandle<Pair<IntRef, S>, WArrayList<R>> reduceHandle =
186         createFunction.createReducer(
187             new BasicArrayReduce<>(fixedSize, typeOps, elementReduceOp));
188     final IntRef curIndex = new IntRef(0);
189     final R reusableValue = typeOps.create();
190     final R initialValue = elementReduceOp.createInitialValue();
191     final MutablePair<IntRef, S> reusablePair =
192         MutablePair.of(new IntRef(0), null);
193     final ReducerHandle<S, R> elementReduceHandle = new ReducerHandle<S, R>() {
194       @Override
195       public R getReducedValue(MasterGlobalCommUsage master) {
196         WArrayList<R> result = reduceHandle.getReducedValue(master);
197         if (fixedSize == -1 && curIndex.value >= result.size()) {
198           typeOps.set(reusableValue, initialValue);
199         } else {
200           result.getIntoW(curIndex.value, reusableValue);
201         }
202         return reusableValue;
203       }
204 
205       @Override
206       public void reduce(S valueToReduce) {
207         reusablePair.getLeft().value = curIndex.value;
208         reusablePair.setRight(valueToReduce);
209         reduceHandle.reduce(reusablePair);
210       }
211 
212       @Override
213       public BroadcastHandle<R> broadcastValue(BlockMasterApi master) {
214         throw new UnsupportedOperationException();
215       }
216     };
217 
218     return new ReducerArrayHandle<S, R>() {
219       @Override
220       public ReducerHandle<S, R> get(int index) {
221         curIndex.value = index;
222         return elementReduceHandle;
223       }
224 
225       @Override
226       public int getStaticSize() {
227         if (fixedSize == -1) {
228           throw new UnsupportedOperationException(
229               "Cannot call size, when one is not specified upfront");
230         }
231         return fixedSize;
232       }
233 
234       @Override
235       public int getReducedSize(BlockMasterApi master) {
236         return reduceHandle.getReducedValue(master).size();
237       }
238 
239       @Override
240       public BroadcastArrayHandle<R> broadcastValue(BlockMasterApi master) {
241         final BroadcastHandle<WArrayList<R>> broadcastHandle =
242             reduceHandle.broadcastValue(master);
243         final IntRef curIndex = new IntRef(0);
244         final R reusableValue = typeOps.create();
245         final BroadcastHandle<R>
246         elementBroadcastHandle = new BroadcastHandle<R>() {
247           @Override
248           public R getBroadcast(WorkerBroadcastUsage worker) {
249             WArrayList<R> result = broadcastHandle.getBroadcast(worker);
250             if (fixedSize == -1 && curIndex.value >= result.size()) {
251               typeOps.set(reusableValue, initialValue);
252             } else {
253               result.getIntoW(curIndex.value, reusableValue);
254             }
255             return reusableValue;
256           }
257         };
258         return new BroadcastArrayHandle<R>() {
259           @Override
260           public BroadcastHandle<R> get(int index) {
261             curIndex.value = index;
262             return elementBroadcastHandle;
263           }
264 
265           @Override
266           public int getStaticSize() {
267             if (fixedSize == -1) {
268               throw new UnsupportedOperationException(
269                   "Cannot call size, when one is not specified upfront");
270             }
271             return fixedSize;
272           }
273 
274           @Override
275           public int getBroadcastedSize(WorkerBroadcastUsage worker) {
276             return broadcastHandle.getBroadcast(worker).size();
277           }
278         };
279       }
280     };
281   }
282 
283 
284   private void init() {
285     initialElement = elementReduceOp.createInitialValue();
286     reusable = typeOps.create();
287     reusable2 = typeOps.create();
288   }
289 
290   @Override
291   public WArrayList<R> createInitialValue() {
292     if (fixedSize != -1) {
293       WArrayList<R> list = typeOps.createArrayList(fixedSize);
294       fill(list, fixedSize);
295       return list;
296     } else {
297       return typeOps.createArrayList(1);
298     }
299   }
300 
301   private void fill(WArrayList<R> list, int newSize) {
302     if (fixedSize != -1 && newSize > fixedSize) {
303       throw new IllegalArgumentException(newSize + " larger then " + fixedSize);
304     }
305 
306     if (list.capacity() < newSize) {
307       list.setCapacity(newSize);
308     }
309     while (list.size() < newSize) {
310       list.addW(initialElement);
311     }
312   }
313 
314   @Override
315   public WArrayList<R> reduce(
316       WArrayList<R> curValue, Pair<IntRef, S> valueToReduce) {
317     int index = valueToReduce.getLeft().value;
318     fill(curValue, index + 1);
319     curValue.getIntoW(index, reusable);
320     R result = elementReduceOp.reduce(reusable, valueToReduce.getRight());
321     curValue.setW(index, result);
322     return curValue;
323   }
324 
325   @Override
326   public WArrayList<R> reduceMerge(
327       WArrayList<R> curValue, WArrayList<R> valueToReduce) {
328     fill(curValue, valueToReduce.size());
329     for (int i = 0; i < valueToReduce.size(); i++) {
330       valueToReduce.getIntoW(i, reusable2);
331       curValue.getIntoW(i, reusable);
332       R result = elementReduceOp.reduceMerge(reusable, reusable2);
333       curValue.setW(i, result);
334     }
335 
336     return curValue;
337   }
338 
339   @Override
340   public void write(DataOutput out) throws IOException {
341     out.writeInt(fixedSize);
342     TypeOpsUtils.writeTypeOps(typeOps, out);
343     WritableUtils.writeWritableObject(elementReduceOp, out);
344   }
345 
346   @Override
347   public void readFields(DataInput in) throws IOException {
348     fixedSize = in.readInt();
349     typeOps = TypeOpsUtils.readTypeOps(in);
350     elementReduceOp = WritableUtils.readWritableObject(in, null);
351     init();
352   }
353 }