This project has retired. For details please refer to its Attic page.
ReducersForPieceHandler xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.piece.global_comm.internal;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.concurrent.atomic.AtomicInteger;
25  
26  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
27  import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
28  import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
29  import org.apache.giraph.master.MasterGlobalCommUsage;
30  import org.apache.giraph.reducers.ReduceOperation;
31  import org.apache.giraph.reducers.Reducer;
32  import org.apache.giraph.utils.WritableUtils;
33  import org.apache.giraph.worker.WorkerBroadcastUsage;
34  import org.apache.giraph.worker.WorkerReduceUsage;
35  import org.apache.hadoop.io.Writable;
36  
37  /**
38   * All logic for transforming Giraph's reducer API to reducer handles.
39   * Contains state of active reducers, and is kept within a Piece.
40   */
41  public class ReducersForPieceHandler implements VertexSenderObserver {
42    private static final AtomicInteger HANDLER_COUNTER = new AtomicInteger();
43    private static final AtomicInteger BROADCAST_COUNTER = new AtomicInteger();
44  
45    private final int handleIndex = HANDLER_COUNTER.incrementAndGet();
46    private final AtomicInteger reduceCounter = new AtomicInteger();
47  
48    private final ArrayList<VertexSenderObserver> observers = new ArrayList<>();
49  
50    @Override
51    public void vertexSenderWorkerPreprocess(WorkerReduceUsage usage) {
52      for (VertexSenderObserver observer : observers) {
53        observer.vertexSenderWorkerPreprocess(usage);
54      }
55    }
56  
57    @Override
58    public void vertexSenderWorkerPostprocess(WorkerReduceUsage usage) {
59      for (VertexSenderObserver observer : observers) {
60        observer.vertexSenderWorkerPostprocess(usage);
61      }
62    }
63  
64    public <S, R extends Writable> ReducerHandle<S, R> createLocalReducer(
65        MasterGlobalCommUsage master,  ReduceOperation<S, R> reduceOp,
66        R globalInitialValue) {
67      LocalReduceHandle<S, R> handle = new LocalReduceHandle<>(reduceOp);
68      master.registerReducer(handle.getName(), reduceOp, globalInitialValue);
69      observers.add(handle);
70      return handle;
71    }
72  
73    public <S, R extends Writable> ReducerHandle<S, R> createGlobalReducer(
74        MasterGlobalCommUsage master,  ReduceOperation<S, R> reduceOp,
75        R globalInitialValue) {
76      ReduceHandleImpl<S, R> handle = new GlobalReduceHandle<>(reduceOp);
77      master.registerReducer(handle.getName(), reduceOp, globalInitialValue);
78      observers.add(handle);
79      return handle;
80    }
81  
82    /**
83     * Implementation of BroadcastHandle
84     *
85     * @param <T> Value type
86     */
87    public static class BroadcastHandleImpl<T> implements BroadcastHandle<T> {
88      private final String name;
89  
90      public BroadcastHandleImpl() {
91        this.name = "_utils.broadcast." + BROADCAST_COUNTER.incrementAndGet();
92      }
93  
94      public String getName() {
95        return name;
96      }
97  
98      @Override
99      public T getBroadcast(WorkerBroadcastUsage worker) {
100       return worker.getBroadcast(name);
101     }
102   }
103 
104   /**
105    * Parent implementation of ReducerHandle
106    *
107    * @param <S> Single value type
108    * @param <R> Reduced value type
109    */
110   public abstract class ReduceHandleImpl<S, R extends Writable>
111       implements ReducerHandle<S, R>, VertexSenderObserver {
112     protected final ReduceOperation<S, R> reduceOp;
113     private final String name;
114 
115     private ReduceHandleImpl(ReduceOperation<S, R> reduceOp) {
116       this.reduceOp = reduceOp;
117       name = "_utils." + handleIndex +
118           ".reduce." + reduceCounter.incrementAndGet();
119     }
120 
121     public String getName() {
122       return name;
123     }
124 
125     @Override
126     public R getReducedValue(MasterGlobalCommUsage master) {
127       return master.getReduced(name);
128     }
129 
130     @Override
131     public BroadcastHandle<R> broadcastValue(BlockMasterApi master) {
132       return unwrapHandle(master.broadcast(
133           new WrappedReducedValue<>(reduceOp, getReducedValue(master))));
134     }
135   }
136 
137   private static <R extends Writable> BroadcastHandle<R> unwrapHandle(
138       final BroadcastHandle<WrappedReducedValue<R>> handle) {
139     return new BroadcastHandle<R>() {
140       @Override
141       public R getBroadcast(WorkerBroadcastUsage worker) {
142         return handle.getBroadcast(worker).getValue();
143       }
144     };
145   }
146 
147   /**
148    * Wrapper that makes reduced values self-serializable,
149    * and allows them to be broadcasted.
150    *
151    * @param <R> Reduced value type
152    */
153   public static class WrappedReducedValue<R extends Writable>
154       implements Writable {
155     private ReduceOperation<?, R> reduceOp;
156     private R value;
157 
158     public WrappedReducedValue() {
159     }
160 
161     public WrappedReducedValue(ReduceOperation<?, R> reduceOp, R value) {
162       this.reduceOp = reduceOp;
163       this.value = value;
164     }
165 
166     @Override
167     public void write(DataOutput out) throws IOException {
168       WritableUtils.writeWritableObject(reduceOp, out);
169       value.write(out);
170     }
171 
172     @Override
173     public void readFields(DataInput in) throws IOException {
174       reduceOp = WritableUtils.readWritableObject(in, null);
175       value = reduceOp.createInitialValue();
176       value.readFields(in);
177     }
178 
179     public R getValue() {
180       return value;
181     }
182   }
183 
184   /**
185    * Global Reduce Handle is implementation of ReducerHandle, that will keep
186    * only one value for each worker, and each call to reduce will have
187    * to obtain a global lock, and incur synchronization costs.
188    * Use only when objects are so large, that having many copies cannot fit
189    * into memory.
190    *
191    * @param <S> Single value type
192    * @param <R> Reduced value type
193    */
194   public class GlobalReduceHandle<S, R extends Writable>
195       extends ReduceHandleImpl<S, R> {
196     private transient WorkerReduceUsage usage;
197 
198     public GlobalReduceHandle(ReduceOperation<S, R> reduceOp) {
199       super(reduceOp);
200     }
201 
202     @Override
203     public void vertexSenderWorkerPreprocess(WorkerReduceUsage usage) {
204       this.usage = usage;
205     }
206 
207     @Override
208     public void reduce(S valueToReduce) {
209       usage.reduce(getName(), valueToReduce);
210     }
211 
212     @Override
213     public void vertexSenderWorkerPostprocess(WorkerReduceUsage usage) {
214     }
215   }
216 
217   /**
218    * Local Reduce Handle is implementation of ReducerHandle, that will make a
219    * partially reduced value on each worker thread, which are at the end
220    * reduced all together.
221    * This is preferred implementation, unless it cannot be used due to memory
222    * overhead, because all partially reduced values will not fit the memory.
223    *
224    * @param <S> Single value type
225    * @param <R> Reduced value type
226    */
227   public class LocalReduceHandle<S, R extends Writable>
228       extends ReduceHandleImpl<S, R> {
229     private transient Reducer<S, R> reducer;
230 
231     public LocalReduceHandle(ReduceOperation<S, R> reduceOp) {
232       super(reduceOp);
233     }
234 
235     @Override
236     public void vertexSenderWorkerPreprocess(WorkerReduceUsage usage) {
237       this.reducer = new Reducer<>(reduceOp);
238     }
239 
240     @Override
241     public void reduce(S valueToReduce) {
242       reducer.reduce(valueToReduce);
243     }
244 
245     @Override
246     public void vertexSenderWorkerPostprocess(WorkerReduceUsage usage) {
247       usage.reduceMerge(getName(), reducer.getCurrentValue());
248     }
249   }
250 }