This project has retired. For details please refer to its Attic page.
BasicMapReduce xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.reducers.map;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.IOException;
23  import java.util.Iterator;
24  
25  import org.apache.commons.lang3.tuple.MutablePair;
26  import org.apache.commons.lang3.tuple.Pair;
27  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
28  import org.apache.giraph.block_app.framework.api.CreateReducersApi;
29  import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi;
30  import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
31  import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
32  import org.apache.giraph.block_app.framework.piece.global_comm.map.BroadcastMapHandle;
33  import org.apache.giraph.block_app.framework.piece.global_comm.map.ReducerMapHandle;
34  import org.apache.giraph.master.MasterGlobalCommUsage;
35  import org.apache.giraph.reducers.ReduceOperation;
36  import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
37  import org.apache.giraph.types.ops.PrimitiveTypeOps;
38  import org.apache.giraph.types.ops.TypeOpsUtils;
39  import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
40  import org.apache.giraph.types.ops.collections.WritableWriter;
41  import org.apache.giraph.utils.WritableUtils;
42  import org.apache.giraph.worker.WorkerBroadcastUsage;
43  import org.apache.hadoop.io.Writable;
44  import org.apache.hadoop.io.WritableComparable;
45  
46  
47  /**
48   * Efficient generic primitive map of values reduce operation.
49   * (it is BasicMap Reduce, not to be confused with MapReduce)
50   *
51   * @param <K> Key type
52   * @param <S> Single value type
53   * @param <R> Reduced value type
54   */
55  public class BasicMapReduce<K extends WritableComparable, S,
56      R extends Writable>
57      implements ReduceOperation<Pair<K, S>, Basic2ObjectMap<K, R>> {
58    private PrimitiveIdTypeOps<K> keyTypeOps;
59    private PrimitiveTypeOps<R> typeOps;
60    private ReduceOperation<S, R> elementReduceOp;
61    private WritableWriter<R> writer;
62  
63    public BasicMapReduce() {
64    }
65  
66    /**
67     * Create ReduceOperation that reduces BasicMaps by reducing individual
68     * elements corresponding to the same key.
69     *
70     * @param keyTypeOps TypeOps of keys
71     * @param typeOps TypeOps of individual elements
72     * @param elementReduceOp ReduceOperation for individual elements
73     */
74    public BasicMapReduce(
75        PrimitiveIdTypeOps<K> keyTypeOps, PrimitiveTypeOps<R> typeOps,
76        ReduceOperation<S, R> elementReduceOp) {
77      this.keyTypeOps = keyTypeOps;
78      this.typeOps = typeOps;
79      this.elementReduceOp = elementReduceOp;
80      init();
81    }
82  
83    /**
84     * Registers one new local reducer, that will reduce BasicMap,
85     * by reducing individual elements corresponding to the same key
86     * using {@code elementReduceOp}.
87     *
88     * This function will return ReducerMapHandle, by which
89     * individual elements can be manipulated separately.
90     *
91     * @param keyTypeOps TypeOps of keys
92     * @param typeOps TypeOps of individual elements
93     * @param elementReduceOp ReduceOperation for individual elements
94     * @param reduceApi API for creating reducers
95     * @return Created ReducerMapHandle
96     */
97    public static <K extends WritableComparable, S, R extends Writable>
98    ReducerMapHandle<K, S, R> createLocalMapHandles(
99        PrimitiveIdTypeOps<K> keyTypeOps, PrimitiveTypeOps<R> typeOps,
100       ReduceOperation<S, R> elementReduceOp,
101       final CreateReducersApi reduceApi) {
102     return createMapHandles(
103         keyTypeOps, typeOps, elementReduceOp,
104         new CreateReducerFunctionApi() {
105           @Override
106           public <S, R extends Writable> ReducerHandle<S, R> createReducer(
107               ReduceOperation<S, R> reduceOp) {
108             return reduceApi.createLocalReducer(reduceOp);
109           }
110         });
111   }
112 
113   /**
114    * Registers one new reducer, that will reduce BasicMap,
115    * by reducing individual elements corresponding to the same key
116    * using {@code elementReduceOp}.
117    *
118    * This function will return ReducerMapHandle, by which
119    * individual elements can be manipulated separately.
120    *
121    * @param keyTypeOps TypeOps of keys
122    * @param typeOps TypeOps of individual elements
123    * @param elementReduceOp ReduceOperation for individual elements
124    * @param createFunction Function for creating a reducer
125    * @return Created ReducerMapHandle
126    */
127   public static <K extends WritableComparable, S, R extends Writable>
128   ReducerMapHandle<K, S, R> createMapHandles(
129       final PrimitiveIdTypeOps<K> keyTypeOps, final PrimitiveTypeOps<R> typeOps,
130       ReduceOperation<S, R> elementReduceOp,
131       CreateReducerFunctionApi createFunction) {
132     final ReducerHandle<Pair<K, S>, Basic2ObjectMap<K, R>> reduceHandle =
133       createFunction.createReducer(
134           new BasicMapReduce<>(keyTypeOps, typeOps, elementReduceOp));
135     final K curIndex = keyTypeOps.create();
136     final R reusableValue = typeOps.create();
137     final R initialValue = elementReduceOp.createInitialValue();
138     final MutablePair<K, S> reusablePair = MutablePair.of(null, null);
139     final ReducerHandle<S, R> elementReduceHandle = new ReducerHandle<S, R>() {
140       @Override
141       public R getReducedValue(MasterGlobalCommUsage master) {
142         Basic2ObjectMap<K, R> result = reduceHandle.getReducedValue(master);
143         R value = result.get(curIndex);
144         if (value == null) {
145           typeOps.set(reusableValue, initialValue);
146         } else {
147           typeOps.set(reusableValue, value);
148         }
149         return reusableValue;
150       }
151 
152       @Override
153       public void reduce(S valueToReduce) {
154         reusablePair.setLeft(curIndex);
155         reusablePair.setRight(valueToReduce);
156         reduceHandle.reduce(reusablePair);
157       }
158 
159       @Override
160       public BroadcastHandle<R> broadcastValue(BlockMasterApi master) {
161         throw new UnsupportedOperationException();
162       }
163     };
164 
165     return new ReducerMapHandle<K, S, R>() {
166       @Override
167       public ReducerHandle<S, R> get(K key) {
168         keyTypeOps.set(curIndex, key);
169         return elementReduceHandle;
170       }
171 
172       @Override
173       public int getReducedSize(BlockMasterApi master) {
174         return reduceHandle.getReducedValue(master).size();
175       }
176 
177       @Override
178       public BroadcastMapHandle<K, R> broadcastValue(BlockMasterApi master) {
179         final BroadcastHandle<Basic2ObjectMap<K, R>> broadcastHandle =
180           reduceHandle.broadcastValue(master);
181         final K curIndex = keyTypeOps.create();
182         final R reusableValue = typeOps.create();
183         final BroadcastHandle<R>
184         elementBroadcastHandle = new BroadcastHandle<R>() {
185           @Override
186           public R getBroadcast(WorkerBroadcastUsage worker) {
187             Basic2ObjectMap<K, R> result = broadcastHandle.getBroadcast(worker);
188             R value = result.get(curIndex);
189             if (value == null) {
190               typeOps.set(reusableValue, initialValue);
191             } else {
192               typeOps.set(reusableValue, value);
193             }
194             return reusableValue;
195           }
196         };
197         return new BroadcastMapHandle<K, R>() {
198           @Override
199           public BroadcastHandle<R> get(K key) {
200             keyTypeOps.set(curIndex, key);
201             return elementBroadcastHandle;
202           }
203 
204           @Override
205           public int getBroadcastedSize(WorkerBroadcastUsage worker) {
206             return broadcastHandle.getBroadcast(worker).size();
207           }
208         };
209       }
210     };
211   }
212 
213   private void init() {
214     writer = new WritableWriter<R>() {
215       @Override
216       public void write(DataOutput out, R value) throws IOException {
217         value.write(out);
218       }
219 
220       @Override
221       public R readFields(DataInput in) throws IOException {
222         R result = typeOps.create();
223         result.readFields(in);
224         return result;
225       }
226     };
227   }
228 
229   @Override
230   public Basic2ObjectMap<K, R> createInitialValue() {
231     return keyTypeOps.create2ObjectOpenHashMap(writer);
232   }
233 
234   @Override
235   public Basic2ObjectMap<K, R> reduce(
236       Basic2ObjectMap<K, R> curValue, Pair<K, S> valueToReduce) {
237     R result = curValue.get(valueToReduce.getLeft());
238     if (result == null) {
239       result = typeOps.create();
240     }
241     result = elementReduceOp.reduce(result, valueToReduce.getRight());
242     curValue.put(valueToReduce.getLeft(), result);
243     return curValue;
244   }
245 
246   @Override
247   public Basic2ObjectMap<K, R> reduceMerge(
248       Basic2ObjectMap<K, R> curValue, Basic2ObjectMap<K, R> valueToReduce) {
249     for (Iterator<K> iter = valueToReduce.fastKeyIterator(); iter.hasNext();) {
250       K key = iter.next();
251 
252       R result = curValue.get(key);
253       if (result == null) {
254         result = typeOps.create();
255       }
256       result = elementReduceOp.reduceMerge(result, valueToReduce.get(key));
257       curValue.put(key, result);
258     }
259     return curValue;
260   }
261 
262   @Override
263   public void write(DataOutput out) throws IOException {
264     TypeOpsUtils.writeTypeOps(keyTypeOps, out);
265     TypeOpsUtils.writeTypeOps(typeOps, out);
266     WritableUtils.writeWritableObject(elementReduceOp, out);
267   }
268 
269   @Override
270   public void readFields(DataInput in) throws IOException {
271     keyTypeOps = TypeOpsUtils.readTypeOps(in);
272     typeOps = TypeOpsUtils.readTypeOps(in);
273     elementReduceOp = WritableUtils.readWritableObject(in, null);
274     init();
275   }
276 }