View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.api.local;
19  
20  import java.io.IOException;
21  import java.util.List;
22  import java.util.concurrent.CountDownLatch;
23  import java.util.concurrent.ExecutorService;
24  import java.util.concurrent.Executors;
25  import java.util.concurrent.atomic.AtomicBoolean;
26  import java.util.concurrent.atomic.AtomicReference;
27  
28  import org.apache.giraph.block_app.framework.BlockFactory;
29  import org.apache.giraph.block_app.framework.BlockUtils;
30  import org.apache.giraph.block_app.framework.api.local.InternalApi.InternalWorkerApi;
31  import org.apache.giraph.block_app.framework.block.Block;
32  import org.apache.giraph.block_app.framework.internal.BlockMasterLogic;
33  import org.apache.giraph.block_app.framework.internal.BlockWorkerContextLogic;
34  import org.apache.giraph.block_app.framework.internal.BlockWorkerLogic;
35  import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces;
36  import org.apache.giraph.block_app.framework.output.BlockOutputHandle;
37  import org.apache.giraph.conf.BooleanConfOption;
38  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
39  import org.apache.giraph.conf.IntConfOption;
40  import org.apache.giraph.graph.Vertex;
41  import org.apache.giraph.io.SimpleVertexWriter;
42  import org.apache.giraph.partition.Partition;
43  import org.apache.giraph.utils.InternalVertexRunner;
44  import org.apache.giraph.utils.TestGraph;
45  import org.apache.giraph.utils.Trimmable;
46  import org.apache.giraph.utils.WritableUtils;
47  import org.apache.giraph.writable.kryo.KryoWritableWrapper;
48  import org.apache.hadoop.io.Writable;
49  import org.apache.hadoop.io.WritableComparable;
50  import org.apache.hadoop.util.Progressable;
51  
52  import com.google.common.base.Preconditions;
53  import com.google.common.collect.Iterables;
54  
55  /**
56   * Local in-memory Block application job runner.
57   * Implementation should be faster then using InternalVertexRunner.
58   *
59   * Useful for fast testing.
60   */
61  @SuppressWarnings({ "rawtypes", "unchecked" })
62  public class LocalBlockRunner {
63    /** Number of threads to use */
64    public static final IntConfOption NUM_THREADS = new IntConfOption(
65        "test.LocalBlockRunner.NUM_THREADS", 3, "");
66    /**
67     * Whether to run all supported checks. Disable if you are running this
68     * not within a unit test, and on a large graph, where performance matters.
69     */
70    public static final BooleanConfOption RUN_ALL_CHECKS = new BooleanConfOption(
71        "test.LocalBlockRunner.RUN_ALL_CHECKS", true, "");
72    // merge into RUN_ALL_CHECKS, after SERIALIZE_MASTER starts working
73    public static final BooleanConfOption SERIALIZE_MASTER =
74        new BooleanConfOption(
75            "test.LocalBlockRunner.SERIALIZE_MASTER", false, "");
76  
77    private LocalBlockRunner() { }
78  
79    /**
80     * Run Block Application specified within the conf, on a given graph,
81     * locally, in-memory.
82     *
83     * With a boolean flag, you can switch between LocalBlockRunner and
84     * InternalVertexRunner implementations of local in-memory computation.
85     */
86    public static
87    <I extends WritableComparable, V extends Writable, E extends Writable>
88    TestGraph<I, V, E> runApp(
89        TestGraph<I, V, E> graph, boolean useFullDigraphTests) throws Exception {
90      if (useFullDigraphTests) {
91        return InternalVertexRunner.runWithInMemoryOutput(graph.getConf(), graph);
92      } else {
93        runApp(graph);
94        return graph;
95      }
96    }
97  
98    /**
99     * Run Block Application specified within the conf, on a given graph,
100    * locally, in-memory.
101    */
102   public static
103   <I extends WritableComparable, V extends Writable, E extends Writable>
104   void runApp(TestGraph<I, V, E> graph) {
105     SimpleVertexWriter<I, V, E> noOpVertexSaver = noOpVertexSaver();
106     runAppWithVertexOutput(graph, noOpVertexSaver);
107   }
108 
109   /**
110    * Run Block from a specified execution stage on a given graph,
111    * locally, in-memory.
112    */
113   public static
114   <I extends WritableComparable, V extends Writable, E extends Writable>
115   void runBlock(
116       TestGraph<I, V, E> graph, Block block, Object executionStage) {
117     SimpleVertexWriter<I, V, E> noOpVertexSaver = noOpVertexSaver();
118     runBlockWithVertexOutput(
119         block, executionStage, graph, noOpVertexSaver);
120   }
121 
122 
123   /**
124    * Run Block Application specified within the conf, on a given graph,
125    * locally, in-memory, with a given vertexSaver.
126    */
127   public static
128   <I extends WritableComparable, V extends Writable, E extends Writable>
129   void runAppWithVertexOutput(
130       TestGraph<I, V, E> graph, final SimpleVertexWriter<I, V, E> vertexSaver) {
131     BlockFactory<?> factory = BlockUtils.createBlockFactory(graph.getConf());
132     runBlockWithVertexOutput(
133         factory.createBlock(graph.getConf()),
134         factory.createExecutionStage(graph.getConf()),
135         graph, vertexSaver);
136   }
137 
138   /**
139    * Run Block from a specified execution stage on a given graph,
140    * locally, in-memory, with a given vertexSaver.
141    */
142   public static
143   <I extends WritableComparable, V extends Writable, E extends Writable>
144   void runBlockWithVertexOutput(
145       Block block, Object executionStage, TestGraph<I, V, E> graph,
146       final SimpleVertexWriter<I, V, E> vertexSaver
147   ) {
148     Preconditions.checkNotNull(block);
149     Preconditions.checkNotNull(graph);
150     ImmutableClassesGiraphConfiguration<I, V, E> conf = graph.getConf();
151     int numPartitions = NUM_THREADS.get(conf);
152     boolean runAllChecks = RUN_ALL_CHECKS.get(conf);
153     boolean serializeMaster = SERIALIZE_MASTER.get(conf);
154     final boolean doOutputDuringComputation = conf.doOutputDuringComputation();
155 
156     final InternalApi internalApi =
157         new InternalApi(graph, conf, numPartitions, runAllChecks);
158     final InternalWorkerApi internalWorkerApi = internalApi.getWorkerApi();
159 
160     BlockUtils.checkBlockTypes(block, executionStage, conf);
161 
162     BlockMasterLogic<Object> blockMasterLogic = new BlockMasterLogic<>();
163     blockMasterLogic.initialize(block, executionStage, internalApi);
164 
165     BlockWorkerContextLogic workerContextLogic =
166         internalApi.getWorkerContextLogic();
167     workerContextLogic.preApplication(internalWorkerApi,
168         new BlockOutputHandle("", conf, new Progressable() {
169           @Override
170           public void progress() {
171           }
172         }));
173 
174     ExecutorService executor = Executors.newFixedThreadPool(numPartitions);
175 
176     if (runAllChecks) {
177       for (Vertex<I, V, E> vertex : graph) {
178         V value = conf.createVertexValue();
179         WritableUtils.copyInto(vertex.getValue(), value);
180         vertex.setValue(value);
181 
182         vertex.setEdges((Iterable) WritableUtils.createCopy(
183             (Writable) vertex.getEdges(), conf.getOutEdgesClass(), conf));
184       }
185     }
186 
187     final AtomicBoolean anyVertexAlive = new AtomicBoolean(true);
188 
189     for (int superstep = 0;; superstep++) {
190       // serialize master to test continuable computation
191       if (serializeMaster) {
192         blockMasterLogic = (BlockMasterLogic) WritableUtils.createCopy(
193             new KryoWritableWrapper<>(blockMasterLogic),
194             KryoWritableWrapper.class,
195             conf).get();
196         blockMasterLogic.initializeAfterRead(internalApi);
197       }
198 
199       if (!anyVertexAlive.get()) {
200         break;
201       }
202 
203       final BlockWorkerPieces workerPieces =
204           blockMasterLogic.computeNext(superstep);
205       if (workerPieces == null) {
206         if (!conf.doOutputDuringComputation()) {
207           List<Partition<I, V, E>> partitions = internalApi.getPartitions();
208           for (Partition<I, V, E> partition : partitions) {
209             for (Vertex<I, V, E> vertex : partition) {
210               try {
211                 vertexSaver.writeVertex(vertex);
212               } catch (IOException | InterruptedException e) {
213                 throw new RuntimeException(e);
214               }
215             }
216           }
217         }
218         int left = executor.shutdownNow().size();
219         Preconditions.checkState(0 == left, "Some work still left to be done?");
220         break;
221       } else {
222         internalApi.afterMasterBeforeWorker(workerPieces);
223         List<Partition<I, V, E>> partitions = internalApi.getPartitions();
224 
225         workerContextLogic.preSuperstep(
226             internalWorkerApi,
227             internalWorkerApi,
228             KryoWritableWrapper.wrapAndCopy(workerPieces), superstep,
229             internalApi.takeWorkerMessages());
230 
231         final CountDownLatch latch = new CountDownLatch(numPartitions);
232         final AtomicReference<Throwable> exception = new AtomicReference<>();
233         anyVertexAlive.set(false);
234         for (final Partition<I, V, E> partition : partitions) {
235           executor.execute(new Runnable() {
236             @Override
237             public void run() {
238               try {
239                 boolean anyCurVertexAlive = false;
240                 BlockWorkerPieces localPieces =
241                     KryoWritableWrapper.wrapAndCopy(workerPieces);
242 
243                 BlockWorkerLogic localLogic = new BlockWorkerLogic(localPieces);
244                 localLogic.preSuperstep(internalWorkerApi, internalWorkerApi);
245 
246                 for (Vertex<I, V, E> vertex : partition) {
247                   Iterable messages = internalApi.takeMessages(vertex.getId());
248                   if (vertex.isHalted() && !Iterables.isEmpty(messages)) {
249                     vertex.wakeUp();
250                   }
251                   // Equivalent of ComputeCallable.computePartition
252                   if (!vertex.isHalted()) {
253                     localLogic.compute(vertex, messages);
254 
255                     // Need to unwrap the mutated edges (possibly)
256                     vertex.unwrapMutableEdges();
257                     //Compact edges representation if possible
258                     if (vertex instanceof Trimmable) {
259                       ((Trimmable) vertex).trim();
260                     }
261                     // Write vertex to superstep output
262                     // (no-op if it is not used)
263                     if (doOutputDuringComputation) {
264                       vertexSaver.writeVertex(vertex);
265                     }
266                     // Need to save the vertex changes (possibly)
267                     partition.saveVertex(vertex);
268                   }
269 
270                   if (!vertex.isHalted()) {
271                     anyCurVertexAlive = true;
272                   }
273                 }
274 
275                 if (anyCurVertexAlive) {
276                   anyVertexAlive.set(true);
277                 }
278                 localLogic.postSuperstep();
279               // CHECKSTYLE: stop IllegalCatch
280               // Need to propagate all exceptions within test
281               } catch (Throwable t) {
282               // CHECKSTYLE: resume IllegalCatch
283                 t.printStackTrace();
284                 exception.set(t);
285               }
286 
287               latch.countDown();
288             }
289           });
290         }
291 
292         try {
293           latch.await();
294         } catch (InterruptedException e) {
295           throw new RuntimeException("Thread intentionally interrupted", e);
296         }
297 
298         if (exception.get() != null) {
299           throw new RuntimeException("Worker failed", exception.get());
300         }
301 
302         workerContextLogic.postSuperstep();
303 
304         internalApi.afterWorkerBeforeMaster();
305       }
306     }
307 
308     workerContextLogic.postApplication();
309     internalApi.postApplication();
310   }
311 
312   private static
313   <I extends WritableComparable, E extends Writable, V extends Writable>
314   SimpleVertexWriter<I, V, E> noOpVertexSaver() {
315     return new SimpleVertexWriter<I, V, E>() {
316       @Override
317       public void writeVertex(Vertex<I, V, E> vertex)
318           throws IOException, InterruptedException {
319         // No-op
320       }
321     };
322   }
323 
324 }