This project has retired. For details please refer to its Attic page.
ArrayReduce xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.reducers.array;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.IOException;
23  import java.lang.reflect.Array;
24  
25  import org.apache.commons.lang3.tuple.MutablePair;
26  import org.apache.commons.lang3.tuple.Pair;
27  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
28  import org.apache.giraph.block_app.framework.api.CreateReducersApi.CreateReducerFunctionApi;
29  import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
30  import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
31  import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
32  import org.apache.giraph.block_app.framework.piece.global_comm.array.ReducerArrayHandle;
33  import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef;
34  import org.apache.giraph.master.MasterGlobalCommUsage;
35  import org.apache.giraph.reducers.ReduceOperation;
36  import org.apache.giraph.utils.ArrayWritable;
37  import org.apache.giraph.utils.WritableUtils;
38  import org.apache.giraph.worker.WorkerBroadcastUsage;
39  import org.apache.hadoop.io.Writable;
40  
41  /**
42   * One reducer representing reduction of array of individual values.
43   * Elements are represented as object, and so BasicArrayReduce should be
44   * used instead when elements are primitive types.
45   *
46   * @param <S> Single value type, objects passed on workers
47   * @param <R> Reduced value type
48   */
49  public class ArrayReduce<S, R extends Writable>
50      implements ReduceOperation<Pair<IntRef, S>, ArrayWritable<R>> {
51    private int fixedSize;
52    private ReduceOperation<S, R> elementReduceOp;
53    private Class<R> elementClass;
54  
55    public ArrayReduce() {
56    }
57  
58    /**
59     * Create ReduceOperation that reduces arrays by reducing individual
60     * elements.
61     *
62     * @param fixedSize Number of elements
63     * @param elementReduceOp ReduceOperation for individual elements
64     */
65    public ArrayReduce(int fixedSize, ReduceOperation<S, R> elementReduceOp) {
66      this.fixedSize = fixedSize;
67      this.elementReduceOp = elementReduceOp;
68      init();
69    }
70  
71    /**
72     * Registers one new reducer, that will reduce array of objects,
73     * by reducing individual elements using {@code elementReduceOp}.
74     *
75     * This function will return ReducerArrayHandle to it, by which
76     * individual elements can be manipulated separately.
77     *
78     * @param fixedSize Number of elements
79     * @param elementReduceOp ReduceOperation for individual elements
80     * @param createFunction Function for creating a reducer
81     * @return Created ReducerArrayHandle
82     */
83    public static <S, T extends Writable>
84    ReducerArrayHandle<S, T> createArrayHandles(
85        final int fixedSize, ReduceOperation<S, T> elementReduceOp,
86        CreateReducerFunctionApi createFunction) {
87      final ReducerHandle<Pair<IntRef, S>, ArrayWritable<T>> reduceHandle =
88          createFunction.createReducer(
89              new ArrayReduce<>(fixedSize, elementReduceOp));
90  
91      final IntRef curIndex = new IntRef(0);
92      final MutablePair<IntRef, S> reusablePair =
93          MutablePair.of(new IntRef(0), null);
94      final ReducerHandle<S, T> elementReduceHandle = new ReducerHandle<S, T>() {
95        @Override
96        public T getReducedValue(MasterGlobalCommUsage master) {
97          ArrayWritable<T> result = reduceHandle.getReducedValue(master);
98          return result.get()[curIndex.value];
99        }
100 
101       @Override
102       public void reduce(S valueToReduce) {
103         reusablePair.getLeft().value = curIndex.value;
104         reusablePair.setRight(valueToReduce);
105         reduceHandle.reduce(reusablePair);
106       }
107 
108       @Override
109       public BroadcastHandle<T> broadcastValue(BlockMasterApi master) {
110         throw new UnsupportedOperationException();
111       }
112     };
113 
114     return new ReducerArrayHandle<S, T>() {
115       @Override
116       public ReducerHandle<S, T> get(int index) {
117         curIndex.value = index;
118         return elementReduceHandle;
119       }
120 
121       @Override
122       public int getStaticSize() {
123         return fixedSize;
124       }
125 
126       @Override
127       public int getReducedSize(BlockMasterApi master) {
128         return getStaticSize();
129       }
130 
131       @Override
132       public BroadcastArrayHandle<T> broadcastValue(BlockMasterApi master) {
133         final BroadcastHandle<ArrayWritable<T>> broadcastHandle =
134             reduceHandle.broadcastValue(master);
135         final IntRef curIndex = new IntRef(0);
136         final BroadcastHandle<T>
137         elementBroadcastHandle = new BroadcastHandle<T>() {
138           @Override
139           public T getBroadcast(WorkerBroadcastUsage worker) {
140             ArrayWritable<T> result = broadcastHandle.getBroadcast(worker);
141             return result.get()[curIndex.value];
142           }
143         };
144         return new BroadcastArrayHandle<T>() {
145           @Override
146           public BroadcastHandle<T> get(int index) {
147             curIndex.value = index;
148             return elementBroadcastHandle;
149           }
150 
151           @Override
152           public int getStaticSize() {
153             return fixedSize;
154           }
155 
156           @Override
157           public int getBroadcastedSize(WorkerBroadcastUsage worker) {
158             return getStaticSize();
159           }
160         };
161       }
162     };
163   }
164 
165   private void init() {
166     elementClass = (Class<R>) elementReduceOp.createInitialValue().getClass();
167   }
168 
169   @Override
170   public ArrayWritable<R> createInitialValue() {
171     R[] values = (R[]) Array.newInstance(elementClass, fixedSize);
172     for (int i = 0; i < fixedSize; i++) {
173       values[i] = elementReduceOp.createInitialValue();
174     }
175     return new ArrayWritable<>(elementClass, values);
176   }
177 
178   @Override
179   public ArrayWritable<R> reduce(
180       ArrayWritable<R> curValue, Pair<IntRef, S> valueToReduce) {
181     int index = valueToReduce.getLeft().value;
182     curValue.get()[index] =
183         elementReduceOp.reduce(curValue.get()[index], valueToReduce.getRight());
184     return curValue;
185   }
186 
187   @Override
188   public ArrayWritable<R> reduceMerge(
189       ArrayWritable<R> curValue, ArrayWritable<R> valueToReduce) {
190     for (int i = 0; i < fixedSize; i++) {
191       curValue.get()[i] =
192           elementReduceOp.reduceMerge(
193               curValue.get()[i], valueToReduce.get()[i]);
194     }
195     return curValue;
196   }
197 
198   @Override
199   public void write(DataOutput out) throws IOException {
200     out.writeInt(fixedSize);
201     WritableUtils.writeWritableObject(elementReduceOp, out);
202   }
203 
204   @Override
205   public void readFields(DataInput in) throws IOException {
206     fixedSize = in.readInt();
207     elementReduceOp = WritableUtils.readWritableObject(in, null);
208     init();
209   }
210 
211 }