This project has retired. For details please refer to its Attic page.
LocalBlockRunner xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.api.local;
19  
20  import java.io.IOException;
21  import java.util.List;
22  import java.util.concurrent.CountDownLatch;
23  import java.util.concurrent.ExecutorService;
24  import java.util.concurrent.Executors;
25  import java.util.concurrent.atomic.AtomicBoolean;
26  import java.util.concurrent.atomic.AtomicReference;
27  
28  import org.apache.giraph.block_app.framework.BlockFactory;
29  import org.apache.giraph.block_app.framework.BlockUtils;
30  import org.apache.giraph.block_app.framework.api.local.InternalApi.InternalWorkerApi;
31  import org.apache.giraph.block_app.framework.block.Block;
32  import org.apache.giraph.block_app.framework.internal.BlockMasterLogic;
33  import org.apache.giraph.block_app.framework.internal.BlockWorkerContextLogic;
34  import org.apache.giraph.block_app.framework.internal.BlockWorkerLogic;
35  import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces;
36  import org.apache.giraph.block_app.framework.output.BlockOutputHandle;
37  import org.apache.giraph.conf.BooleanConfOption;
38  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
39  import org.apache.giraph.conf.IntConfOption;
40  import org.apache.giraph.graph.OnlyIdVertex;
41  import org.apache.giraph.graph.Vertex;
42  import org.apache.giraph.io.SimpleVertexWriter;
43  import org.apache.giraph.partition.Partition;
44  import org.apache.giraph.utils.InternalVertexRunner;
45  import org.apache.giraph.utils.TestGraph;
46  import org.apache.giraph.utils.Trimmable;
47  import org.apache.giraph.utils.WritableUtils;
48  import org.apache.giraph.writable.kryo.KryoWritableWrapper;
49  import org.apache.hadoop.io.Writable;
50  import org.apache.hadoop.io.WritableComparable;
51  import org.apache.hadoop.util.Progressable;
52  
53  import com.google.common.base.Preconditions;
54  import com.google.common.collect.Iterables;
55  
56  /**
57   * Local in-memory Block application job runner.
58   * Implementation should be faster then using InternalVertexRunner.
59   *
60   * Useful for fast testing.
61   */
62  @SuppressWarnings({ "rawtypes", "unchecked" })
63  public class LocalBlockRunner {
64    /** Number of threads to use */
65    public static final IntConfOption NUM_THREADS = new IntConfOption(
66        "test.LocalBlockRunner.NUM_THREADS", 3, "");
67    /** Number of vertex partitions */
68    public static final IntConfOption NUM_PARTITIONS = new IntConfOption(
69        "test.LocalBlockRunner.NUM_PARTITIONS", 16, "");
70    /**
71     * Whether to run all supported checks. Disable if you are running this
72     * not within a unit test, and on a large graph, where performance matters.
73     */
74    public static final BooleanConfOption RUN_ALL_CHECKS = new BooleanConfOption(
75        "test.LocalBlockRunner.RUN_ALL_CHECKS", true, "");
76    // merge into RUN_ALL_CHECKS, after SERIALIZE_MASTER starts working
77    public static final BooleanConfOption SERIALIZE_MASTER =
78        new BooleanConfOption(
79            "test.LocalBlockRunner.SERIALIZE_MASTER", false, "");
80  
81    private LocalBlockRunner() { }
82  
83    /**
84     * Run Block Application specified within the conf, on a given graph,
85     * locally, in-memory.
86     *
87     * With a boolean flag, you can switch between LocalBlockRunner and
88     * InternalVertexRunner implementations of local in-memory computation.
89     */
90    public static
91    <I extends WritableComparable, V extends Writable, E extends Writable>
92    TestGraph<I, V, E> runApp(
93        TestGraph<I, V, E> graph, boolean useFullDigraphTests) throws Exception {
94      if (useFullDigraphTests) {
95        return InternalVertexRunner.runWithInMemoryOutput(graph.getConf(), graph);
96      } else {
97        runApp(graph);
98        return graph;
99      }
100   }
101 
102   /**
103    * Run Block Application specified within the conf, on a given graph,
104    * locally, in-memory.
105    */
106   public static
107   <I extends WritableComparable, V extends Writable, E extends Writable>
108   void runApp(TestGraph<I, V, E> graph) {
109     SimpleVertexWriter<I, V, E> noOpVertexSaver = noOpVertexSaver();
110     runAppWithVertexOutput(graph, noOpVertexSaver);
111   }
112 
113   /**
114    * Run Block from a specified execution stage on a given graph,
115    * locally, in-memory.
116    */
117   public static
118   <I extends WritableComparable, V extends Writable, E extends Writable>
119   void runBlock(
120       TestGraph<I, V, E> graph, Block block, Object executionStage) {
121     SimpleVertexWriter<I, V, E> noOpVertexSaver = noOpVertexSaver();
122     runBlockWithVertexOutput(
123         block, executionStage, graph, noOpVertexSaver);
124   }
125 
126 
127   /**
128    * Run Block Application specified within the conf, on a given graph,
129    * locally, in-memory, with a given vertexSaver.
130    */
131   public static
132   <I extends WritableComparable, V extends Writable, E extends Writable>
133   void runAppWithVertexOutput(
134       TestGraph<I, V, E> graph, final SimpleVertexWriter<I, V, E> vertexSaver) {
135     BlockFactory<?> factory = BlockUtils.createBlockFactory(graph.getConf());
136     runBlockWithVertexOutput(
137         factory.createBlock(graph.getConf()),
138         factory.createExecutionStage(graph.getConf()),
139         graph, vertexSaver);
140   }
141 
142   /**
143    * Run Block from a specified execution stage on a given graph,
144    * locally, in-memory, with a given vertexSaver.
145    */
146   public static
147   <I extends WritableComparable, V extends Writable, E extends Writable>
148   void runBlockWithVertexOutput(
149       Block block, Object executionStage, TestGraph<I, V, E> graph,
150       final SimpleVertexWriter<I, V, E> vertexSaver
151   ) {
152     Preconditions.checkNotNull(block);
153     Preconditions.checkNotNull(graph);
154     ImmutableClassesGiraphConfiguration<I, V, E> conf = graph.getConf();
155     int numThreads = NUM_THREADS.get(conf);
156     int numPartitions = NUM_PARTITIONS.get(conf);
157     boolean runAllChecks = RUN_ALL_CHECKS.get(conf);
158     boolean serializeMaster = SERIALIZE_MASTER.get(conf);
159     final boolean doOutputDuringComputation = conf.doOutputDuringComputation();
160 
161     final InternalApi internalApi =
162         new InternalApi(graph, conf, numPartitions, runAllChecks);
163     final InternalWorkerApi internalWorkerApi = internalApi.getWorkerApi();
164 
165     BlockUtils.checkBlockTypes(block, executionStage, conf);
166 
167     BlockMasterLogic<Object> blockMasterLogic = new BlockMasterLogic<>();
168     blockMasterLogic.initialize(block, executionStage, internalApi);
169 
170     BlockWorkerContextLogic workerContextLogic =
171         internalApi.getWorkerContextLogic();
172     workerContextLogic.preApplication(internalWorkerApi,
173         new BlockOutputHandle("", conf, new Progressable() {
174           @Override
175           public void progress() {
176           }
177         }));
178 
179     ExecutorService executor = Executors.newFixedThreadPool(numThreads);
180 
181     if (runAllChecks) {
182       for (Vertex<I, V, E> vertex : graph) {
183         V value = conf.createVertexValue();
184         WritableUtils.copyInto(vertex.getValue(), value);
185         vertex.setValue(value);
186 
187         vertex.setEdges((Iterable) WritableUtils.createCopy(
188             (Writable) vertex.getEdges(), conf.getOutEdgesClass(), conf));
189       }
190     }
191 
192     final AtomicBoolean anyVertexAlive = new AtomicBoolean(true);
193 
194     for (int superstep = 0;; superstep++) {
195       // serialize master to test continuable computation
196       if (serializeMaster) {
197         blockMasterLogic = (BlockMasterLogic) WritableUtils.createCopy(
198             new KryoWritableWrapper<>(blockMasterLogic),
199             KryoWritableWrapper.class,
200             conf).get();
201         blockMasterLogic.initializeAfterRead(internalApi);
202       }
203 
204       if (!anyVertexAlive.get()) {
205         break;
206       }
207 
208       final BlockWorkerPieces workerPieces =
209           blockMasterLogic.computeNext(superstep);
210       if (workerPieces == null) {
211         if (!conf.doOutputDuringComputation()) {
212           List<Partition<I, V, E>> partitions = internalApi.getPartitions();
213           for (Partition<I, V, E> partition : partitions) {
214             for (Vertex<I, V, E> vertex : partition) {
215               try {
216                 vertexSaver.writeVertex(vertex);
217               } catch (IOException | InterruptedException e) {
218                 throw new RuntimeException(e);
219               }
220             }
221           }
222         }
223         int left = executor.shutdownNow().size();
224         Preconditions.checkState(0 == left, "Some work still left to be done?");
225         break;
226       } else {
227         internalApi.afterMasterBeforeWorker(workerPieces);
228         List<Partition<I, V, E>> partitions = internalApi.getPartitions();
229 
230         workerContextLogic.preSuperstep(
231             internalWorkerApi,
232             internalWorkerApi,
233             KryoWritableWrapper.wrapAndCopy(workerPieces), superstep,
234             internalApi.takeWorkerMessages());
235 
236         final CountDownLatch latch = new CountDownLatch(numPartitions);
237         final AtomicReference<Throwable> exception = new AtomicReference<>();
238         anyVertexAlive.set(false);
239         for (final Partition<I, V, E> partition : partitions) {
240           executor.execute(new Runnable() {
241             @Override
242             public void run() {
243               try {
244                 boolean anyCurVertexAlive = false;
245                 BlockWorkerPieces localPieces =
246                     KryoWritableWrapper.wrapAndCopy(workerPieces);
247 
248                 BlockWorkerLogic localLogic = new BlockWorkerLogic(localPieces);
249                 localLogic.preSuperstep(internalWorkerApi, internalWorkerApi);
250 
251                 if (internalApi.ignoreExistingVertices()) {
252                   Iterable<I> destinations =
253                       internalApi.getPartitionDestinationVertices(
254                           partition.getId());
255                   if (!Iterables.isEmpty(destinations)) {
256                     OnlyIdVertex<I> vertex = new OnlyIdVertex<>();
257 
258                     for (I vertexId : destinations) {
259                       Iterable messages = internalApi.takeMessages(vertexId);
260                       Preconditions.checkState(!Iterables.isEmpty(messages));
261                       vertex.setId(vertexId);
262                       localLogic.compute(vertex, messages);
263 
264                       anyCurVertexAlive = true;
265                     }
266                   }
267                 } else {
268                   for (Vertex<I, V, E> vertex : partition) {
269                     Iterable messages =
270                         internalApi.takeMessages(vertex.getId());
271                     if (vertex.isHalted() && !Iterables.isEmpty(messages)) {
272                       vertex.wakeUp();
273                     }
274                     // Equivalent of ComputeCallable.computePartition
275                     if (!vertex.isHalted()) {
276                       localLogic.compute(vertex, messages);
277 
278                       // Need to unwrap the mutated edges (possibly)
279                       vertex.unwrapMutableEdges();
280                       //Compact edges representation if possible
281                       if (vertex instanceof Trimmable) {
282                         ((Trimmable) vertex).trim();
283                       }
284                       // Write vertex to superstep output
285                       // (no-op if it is not used)
286                       if (doOutputDuringComputation) {
287                         vertexSaver.writeVertex(vertex);
288                       }
289                       // Need to save the vertex changes (possibly)
290                       partition.saveVertex(vertex);
291                     }
292 
293                     if (!vertex.isHalted()) {
294                       anyCurVertexAlive = true;
295                     }
296                   }
297                 }
298 
299                 if (anyCurVertexAlive) {
300                   anyVertexAlive.set(true);
301                 }
302                 localLogic.postSuperstep();
303               // CHECKSTYLE: stop IllegalCatch
304               // Need to propagate all exceptions within test
305               } catch (Throwable t) {
306               // CHECKSTYLE: resume IllegalCatch
307                 t.printStackTrace();
308                 exception.set(t);
309               }
310 
311               latch.countDown();
312             }
313           });
314         }
315 
316         try {
317           latch.await();
318         } catch (InterruptedException e) {
319           throw new RuntimeException("Thread intentionally interrupted", e);
320         }
321 
322         if (exception.get() != null) {
323           throw new RuntimeException("Worker failed", exception.get());
324         }
325 
326         workerContextLogic.postSuperstep();
327 
328         internalApi.afterWorkerBeforeMaster();
329       }
330     }
331 
332     workerContextLogic.postApplication();
333     internalApi.postApplication();
334   }
335 
336   private static
337   <I extends WritableComparable, E extends Writable, V extends Writable>
338   SimpleVertexWriter<I, V, E> noOpVertexSaver() {
339     return new SimpleVertexWriter<I, V, E>() {
340       @Override
341       public void writeVertex(Vertex<I, V, E> vertex)
342           throws IOException, InterruptedException {
343         // No-op
344       }
345     };
346   }
347 
348 }