This project has retired. For details please refer to its
        
        Attic page.
      
 
BlockMasterLogic xref
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.giraph.block_app.framework.internal;
19  
20  import java.util.HashSet;
21  import java.util.Iterator;
22  import java.util.Map;
23  import java.util.Map.Entry;
24  import java.util.TreeMap;
25  
26  import org.apache.commons.lang3.time.DurationFormatUtils;
27  import org.apache.giraph.block_app.framework.BlockFactory;
28  import org.apache.giraph.block_app.framework.BlockUtils;
29  import org.apache.giraph.block_app.framework.api.BlockApiHandle;
30  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
31  import org.apache.giraph.block_app.framework.api.BlockOutputHandleAccessor;
32  import org.apache.giraph.block_app.framework.block.Block;
33  import org.apache.giraph.block_app.framework.block.BlockWithApiHandle;
34  import org.apache.giraph.block_app.framework.piece.AbstractPiece;
35  import org.apache.giraph.conf.GiraphConfiguration;
36  import org.apache.giraph.function.Consumer;
37  import org.apache.giraph.writable.tuple.IntLongWritable;
38  import org.apache.log4j.Logger;
39  import com.google.common.base.Preconditions;
40  
41  
42  
43  
44  
45  
46  
47  
48  @SuppressWarnings("rawtypes")
49  public class BlockMasterLogic<S> {
50    private static final Logger LOG = Logger.getLogger(BlockMasterLogic.class);
51  
52    private Iterator<AbstractPiece> pieceIterator;
53    private PairedPieceAndStage<S> previousPiece;
54    private transient BlockMasterApi masterApi;
55    private long lastTimestamp = -1;
56    private BlockWorkerPieces previousWorkerPieces;
57    private boolean computationDone;
58    private BlockApiHandle blockApiHandle;
59  
60    
61    private final TimeStatsPerEvent masterPerPieceTimeStats =
62        new TimeStatsPerEvent("master");
63    
64    private final TimeStatsPerEvent workerPerPieceTimeStats =
65        new TimeStatsPerEvent("worker");
66  
67    
68  
69  
70  
71    public void initialize(
72        GiraphConfiguration conf, final BlockMasterApi masterApi) {
73      BlockFactory<S> factory = BlockUtils.createBlockFactory(conf);
74      initialize(factory.createBlock(conf), factory.createExecutionStage(conf),
75          masterApi);
76    }
77  
78    
79  
80  
81  
82    public void initialize(
83        Block executionBlock, S executionStage, final BlockMasterApi masterApi) {
84      this.masterApi = masterApi;
85      this.computationDone = false;
86  
87      LOG.info("Executing application - " + executionBlock);
88      if (executionBlock instanceof BlockWithApiHandle) {
89        blockApiHandle =
90          ((BlockWithApiHandle) executionBlock).getBlockApiHandle();
91      }
92      if (blockApiHandle == null) {
93        blockApiHandle = new BlockApiHandle();
94      }
95      blockApiHandle.setMasterApi(masterApi);
96  
97      
98      executionBlock.forAllPossiblePieces(new Consumer<AbstractPiece>() {
99        private final HashSet<AbstractPiece> registeredPieces = new HashSet<>();
100       @SuppressWarnings("deprecation")
101       @Override
102       public void apply(AbstractPiece piece) {
103         
104         if (registeredPieces.add(piece)) {
105           try {
106             piece.registerAggregators(masterApi);
107           } catch (InstantiationException | IllegalAccessException e) {
108             throw new RuntimeException(e);
109           }
110         }
111       }
112     });
113 
114     pieceIterator = executionBlock.iterator();
115     
116     
117     
118     
119     
120     previousPiece = new PairedPieceAndStage<>(null, executionStage);
121   }
122 
123   
124 
125 
126 
127 
128   public void initializeAfterRead(BlockMasterApi masterApi) {
129     this.masterApi = masterApi;
130   }
131 
132   
133 
134 
135 
136 
137 
138 
139 
140   public BlockWorkerPieces<S> computeNext(long superstep) {
141     long beforeMaster = System.currentTimeMillis();
142     if (lastTimestamp != -1) {
143       BlockCounters.setWorkerTimeCounter(
144           previousWorkerPieces, superstep - 1,
145           beforeMaster - lastTimestamp, masterApi, workerPerPieceTimeStats);
146     }
147 
148     if (previousPiece == null) {
149       postApplication();
150       return null;
151     } else {
152       boolean logExecutionStatus =
153           BlockUtils.LOG_EXECUTION_STATUS.get(masterApi.getConf());
154       if (logExecutionStatus) {
155         LOG.info("Master executing " + previousPiece +
156             ", in superstep " + superstep);
157       }
158       previousPiece.masterCompute(masterApi);
159       ((BlockOutputHandleAccessor) masterApi).getBlockOutputHandle().
160           returnAllWriters();
161       long afterMaster = System.currentTimeMillis();
162 
163       if (previousPiece.getPiece() != null) {
164         BlockCounters.setMasterTimeCounter(
165             previousPiece, superstep, afterMaster - beforeMaster, masterApi,
166             masterPerPieceTimeStats);
167       }
168 
169       PairedPieceAndStage<S> nextPiece;
170       if (pieceIterator.hasNext()) {
171         nextPiece = new PairedPieceAndStage<S>(
172             pieceIterator.next(), previousPiece.nextExecutionStage());
173         nextPiece.registerReducers(masterApi);
174       } else {
175         nextPiece = null;
176       }
177       BlockCounters.setStageCounters(
178           "Master finished stage: ", previousPiece.getExecutionStage(),
179           masterApi);
180       if (logExecutionStatus) {
181         LOG.info(
182             "Master passing next " + nextPiece + ", in superstep " + superstep);
183       }
184 
185       
186       
187       BlockWorkerPieces<S> result;
188       if (previousPiece.getPiece() == null && nextPiece == null) {
189         postApplication();
190         result = null;
191       } else {
192         result = new BlockWorkerPieces<>(
193           previousPiece, nextPiece, blockApiHandle);
194         if (logExecutionStatus) {
195           LOG.info("Master in " + superstep + " superstep passing " +
196               result + " to be executed");
197         }
198       }
199 
200       previousPiece = nextPiece;
201       lastTimestamp = afterMaster;
202       previousWorkerPieces = result;
203       return result;
204     }
205   }
206 
207   
208 
209 
210   private void postApplication() {
211     ((BlockOutputHandleAccessor) masterApi).getBlockOutputHandle().
212         closeAllWriters();
213     Preconditions.checkState(!computationDone);
214     computationDone = true;
215     IntLongWritable masterTimes = masterPerPieceTimeStats.logTimeSums();
216     IntLongWritable workerTimes = workerPerPieceTimeStats.logTimeSums();
217     LOG.info("Time split:\n" +
218         TimeStatsPerEvent.header() +
219         TimeStatsPerEvent.line(
220             masterTimes.getLeft().get(),
221             100.0 * masterTimes.getRight().get() /
222               (masterTimes.getRight().get() + workerTimes.getRight().get()),
223             masterTimes.getRight().get(),
224             "master") +
225         TimeStatsPerEvent.line(
226             workerTimes.getLeft().get(),
227             100.0 * workerTimes.getRight().get() /
228               (masterTimes.getRight().get() + workerTimes.getRight().get()),
229             workerTimes.getRight().get(),
230             "worker"));
231   }
232 
233   
234 
235 
236 
237   public static class TimeStatsPerEvent {
238     private final String groupName;
239     private final Map<String, IntLongWritable> keyToCountAndTime =
240         new TreeMap<>();
241 
242     public TimeStatsPerEvent(String groupName) {
243       this.groupName = groupName;
244     }
245 
246     public void inc(String name, long millis) {
247       IntLongWritable val = keyToCountAndTime.get(name);
248       if (val == null) {
249         val = new IntLongWritable();
250         keyToCountAndTime.put(name, val);
251       }
252       val.getLeft().set(val.getLeft().get() + 1);
253       val.getRight().set(val.getRight().get() + millis);
254     }
255 
256     public IntLongWritable logTimeSums() {
257       StringBuilder sb = new StringBuilder("Time sums " + groupName + ":\n");
258       sb.append(header());
259       long total = 0;
260       int count = 0;
261       for (Entry<String, IntLongWritable> entry :
262             keyToCountAndTime.entrySet()) {
263         total += entry.getValue().getRight().get();
264         count += entry.getValue().getLeft().get();
265       }
266 
267       for (Entry<String, IntLongWritable> entry :
268             keyToCountAndTime.entrySet()) {
269         sb.append(line(
270             entry.getValue().getLeft().get(),
271             (100.0 * entry.getValue().getRight().get()) / total,
272             entry.getValue().getRight().get(),
273             entry.getKey()));
274       }
275       LOG.info(sb);
276       return new IntLongWritable(count, total);
277     }
278 
279     public static String header() {
280       return String.format(
281           "%10s%10s%11s   %s%n", "count", "time %", "time", "name");
282     }
283 
284     public static String line(
285         int count, double percTime, long time, String name) {
286       return String.format("%10d%9.2f%%%11s   %s%n",
287           count,
288           percTime,
289           DurationFormatUtils.formatDuration(time, "HH:mm:ss"),
290           name);
291     }
292   }
293 }