View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.internal;
19  
20  import java.util.HashSet;
21  import java.util.Iterator;
22  import java.util.Map;
23  import java.util.Map.Entry;
24  import java.util.TreeMap;
25  
26  import org.apache.commons.lang3.time.DurationFormatUtils;
27  import org.apache.giraph.block_app.framework.BlockFactory;
28  import org.apache.giraph.block_app.framework.BlockUtils;
29  import org.apache.giraph.block_app.framework.api.BlockApiHandle;
30  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
31  import org.apache.giraph.block_app.framework.api.BlockOutputHandleAccessor;
32  import org.apache.giraph.block_app.framework.block.Block;
33  import org.apache.giraph.block_app.framework.block.BlockWithApiHandle;
34  import org.apache.giraph.block_app.framework.piece.AbstractPiece;
35  import org.apache.giraph.conf.GiraphConfiguration;
36  import org.apache.giraph.function.Consumer;
37  import org.apache.giraph.writable.tuple.IntLongWritable;
38  import org.apache.log4j.Logger;
39  import org.python.google.common.base.Preconditions;
40  
41  /**
42   * Block execution logic on master, iterating over Pieces of the
43   * application Block, executing master logic, and providing what needs to be
44   * executed on the workers.
45   *
46   * @param <S> Execution stage type
47   */
48  @SuppressWarnings("rawtypes")
49  public class BlockMasterLogic<S> {
50    private static final Logger LOG = Logger.getLogger(BlockMasterLogic.class);
51  
52    private Iterator<AbstractPiece> pieceIterator;
53    private PairedPieceAndStage<S> previousPiece;
54    private transient BlockMasterApi masterApi;
55    private long lastTimestamp = -1;
56    private BlockWorkerPieces previousWorkerPieces;
57    private boolean computationDone;
58    private BlockApiHandle blockApiHandle;
59  
60    /** Tracks elapsed time on master for each distinct Piece */
61    private final TimeStatsPerEvent masterPerPieceTimeStats =
62        new TimeStatsPerEvent("master");
63    /** Tracks elapsed time on workers for each pair of recieve/send pieces. */
64    private final TimeStatsPerEvent workerPerPieceTimeStats =
65        new TimeStatsPerEvent("worker");
66  
67    /**
68     * Initialize master logic to execute BlockFactory defined in
69     * the configuration.
70     */
71    public void initialize(
72        GiraphConfiguration conf, final BlockMasterApi masterApi) {
73      BlockFactory<S> factory = BlockUtils.createBlockFactory(conf);
74      initialize(factory.createBlock(conf), factory.createExecutionStage(conf),
75          masterApi);
76    }
77  
78    /**
79     * Initialize Master Logic to execute given block, starting
80     * with given executionStage.
81     */
82    public void initialize(
83        Block executionBlock, S executionStage, final BlockMasterApi masterApi) {
84      this.masterApi = masterApi;
85      this.computationDone = false;
86  
87      LOG.info("Executing application - " + executionBlock);
88      if (executionBlock instanceof BlockWithApiHandle) {
89        blockApiHandle =
90          ((BlockWithApiHandle) executionBlock).getBlockApiHandle();
91      }
92      if (blockApiHandle == null) {
93        blockApiHandle = new BlockApiHandle();
94      }
95      blockApiHandle.setMasterApi(masterApi);
96  
97      // We register all possible aggregators at the beginning
98      executionBlock.forAllPossiblePieces(new Consumer<AbstractPiece>() {
99        private final HashSet<AbstractPiece> registeredPieces = new HashSet<>();
100       @SuppressWarnings("deprecation")
101       @Override
102       public void apply(AbstractPiece piece) {
103         // no need to register the same piece twice.
104         if (registeredPieces.add(piece)) {
105           try {
106             piece.registerAggregators(masterApi);
107           } catch (InstantiationException | IllegalAccessException e) {
108             throw new RuntimeException(e);
109           }
110         }
111       }
112     });
113 
114     pieceIterator = executionBlock.iterator();
115     // Invariant is that ReceiveWorkerPiece of previousPiece has already been
116     // executed and that previousPiece.nextExecutionStage() should be used for
117     // iterating. So passing piece as null, and initial state as current state,
118     // so that nothing get's executed in first half, and calculateNextState
119     // returns initial state.
120     previousPiece = new PairedPieceAndStage<>(null, executionStage);
121   }
122 
123   /**
124    * Initialize object after deserializing it.
125    * BlockMasterApi is not serializable, so it is transient, and set via this
126    * method afterwards.
127    */
128   public void initializeAfterRead(BlockMasterApi masterApi) {
129     this.masterApi = masterApi;
130   }
131 
132   /**
133    * Executes operations on master (master compute and registering reducers),
134    * and calculates next pieces to be exectued on workers.
135    *
136    * @param superstep Current superstep
137    * @return Next BlockWorkerPieces to be executed on workers, or null
138    *         if computation should be halted.
139    */
140   public BlockWorkerPieces<S> computeNext(long superstep) {
141     long beforeMaster = System.currentTimeMillis();
142     if (lastTimestamp != -1) {
143       BlockCounters.setWorkerTimeCounter(
144           previousWorkerPieces, superstep - 1,
145           beforeMaster - lastTimestamp, masterApi, workerPerPieceTimeStats);
146     }
147 
148     if (previousPiece == null) {
149       postApplication();
150       return null;
151     } else {
152       boolean logExecutionStatus =
153           BlockUtils.LOG_EXECUTION_STATUS.get(masterApi.getConf());
154       if (logExecutionStatus) {
155         LOG.info("Master executing " + previousPiece +
156             ", in superstep " + superstep);
157       }
158       previousPiece.masterCompute(masterApi);
159       ((BlockOutputHandleAccessor) masterApi).getBlockOutputHandle().
160           returnAllWriters();
161       long afterMaster = System.currentTimeMillis();
162 
163       if (previousPiece.getPiece() != null) {
164         BlockCounters.setMasterTimeCounter(
165             previousPiece, superstep, afterMaster - beforeMaster, masterApi,
166             masterPerPieceTimeStats);
167       }
168 
169       PairedPieceAndStage<S> nextPiece;
170       if (pieceIterator.hasNext()) {
171         nextPiece = new PairedPieceAndStage<S>(
172             pieceIterator.next(), previousPiece.nextExecutionStage());
173         nextPiece.registerReducers(masterApi);
174       } else {
175         nextPiece = null;
176       }
177       BlockCounters.setStageCounters(
178           "Master finished stage: ", previousPiece.getExecutionStage(),
179           masterApi);
180       if (logExecutionStatus) {
181         LOG.info(
182             "Master passing next " + nextPiece + ", in superstep " + superstep);
183       }
184 
185       // if there is nothing more to compute, no need for additional superstep
186       // this can only happen if application uses no pieces.
187       BlockWorkerPieces<S> result;
188       if (previousPiece.getPiece() == null && nextPiece == null) {
189         postApplication();
190         result = null;
191       } else {
192         result = new BlockWorkerPieces<>(
193           previousPiece, nextPiece, blockApiHandle);
194         if (logExecutionStatus) {
195           LOG.info("Master in " + superstep + " superstep passing " +
196               result + " to be executed");
197         }
198       }
199 
200       previousPiece = nextPiece;
201       lastTimestamp = afterMaster;
202       previousWorkerPieces = result;
203       return result;
204     }
205   }
206 
207   /**
208    * Clean up any master state, after application has finished.
209    */
210   private void postApplication() {
211     ((BlockOutputHandleAccessor) masterApi).getBlockOutputHandle().
212         closeAllWriters();
213     Preconditions.checkState(!computationDone);
214     computationDone = true;
215     IntLongWritable masterTimes = masterPerPieceTimeStats.logTimeSums();
216     IntLongWritable workerTimes = workerPerPieceTimeStats.logTimeSums();
217     LOG.info("Time split:\n" +
218         TimeStatsPerEvent.header() +
219         TimeStatsPerEvent.line(
220             masterTimes.getLeft().get(),
221             100.0 * masterTimes.getRight().get() /
222               (masterTimes.getRight().get() + workerTimes.getRight().get()),
223             masterTimes.getRight().get(),
224             "master") +
225         TimeStatsPerEvent.line(
226             workerTimes.getLeft().get(),
227             100.0 * workerTimes.getRight().get() /
228               (masterTimes.getRight().get() + workerTimes.getRight().get()),
229             workerTimes.getRight().get(),
230             "worker"));
231   }
232 
233   /**
234    * Class tracking invocation count and elapsed time for a set of events,
235    * each event being having a String name.
236    */
237   public static class TimeStatsPerEvent {
238     private final String groupName;
239     private final Map<String, IntLongWritable> keyToCountAndTime =
240         new TreeMap<>();
241 
242     public TimeStatsPerEvent(String groupName) {
243       this.groupName = groupName;
244     }
245 
246     public void inc(String name, long millis) {
247       IntLongWritable val = keyToCountAndTime.get(name);
248       if (val == null) {
249         val = new IntLongWritable();
250         keyToCountAndTime.put(name, val);
251       }
252       val.getLeft().set(val.getLeft().get() + 1);
253       val.getRight().set(val.getRight().get() + millis);
254     }
255 
256     public IntLongWritable logTimeSums() {
257       StringBuilder sb = new StringBuilder("Time sums " + groupName + ":\n");
258       sb.append(header());
259       long total = 0;
260       int count = 0;
261       for (Entry<String, IntLongWritable> entry :
262             keyToCountAndTime.entrySet()) {
263         total += entry.getValue().getRight().get();
264         count += entry.getValue().getLeft().get();
265       }
266 
267       for (Entry<String, IntLongWritable> entry :
268             keyToCountAndTime.entrySet()) {
269         sb.append(line(
270             entry.getValue().getLeft().get(),
271             (100.0 * entry.getValue().getRight().get()) / total,
272             entry.getValue().getRight().get(),
273             entry.getKey()));
274       }
275       LOG.info(sb);
276       return new IntLongWritable(count, total);
277     }
278 
279     public static String header() {
280       return String.format(
281           "%10s%10s%11s   %s%n", "count", "time %", "time", "name");
282     }
283 
284     public static String line(
285         int count, double percTime, long time, String name) {
286       return String.format("%10d%9.2f%%%11s   %s%n",
287           count,
288           percTime,
289           DurationFormatUtils.formatDuration(time, "HH:mm:ss"),
290           name);
291     }
292   }
293 }