View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.examples;
20  
21  import org.apache.commons.cli.CommandLine;
22  import org.apache.commons.cli.CommandLineParser;
23  import org.apache.commons.cli.HelpFormatter;
24  import org.apache.commons.cli.Options;
25  import org.apache.commons.cli.PosixParser;
26  import org.apache.giraph.aggregators.LongSumAggregator;
27  import org.apache.giraph.graph.BasicComputation;
28  import org.apache.giraph.edge.Edge;
29  import org.apache.giraph.edge.EdgeFactory;
30  import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
31  import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
32  import org.apache.giraph.job.GiraphJob;
33  import org.apache.giraph.master.DefaultMasterCompute;
34  import org.apache.giraph.graph.Vertex;
35  import org.apache.giraph.worker.WorkerContext;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.io.FloatWritable;
39  import org.apache.hadoop.io.IntWritable;
40  import org.apache.hadoop.io.LongWritable;
41  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
42  import org.apache.hadoop.util.Tool;
43  import org.apache.hadoop.util.ToolRunner;
44  import org.apache.log4j.Logger;
45  
46  import java.io.IOException;
47  
48  /**
49   * An example that simply uses its id, value, and edges to compute new data
50   * every iteration to verify that checkpoint restarting works.  Fault injection
51   * can also test automated checkpoint restarts.
52   */
53  public class SimpleCheckpoint implements Tool {
54    /** Which superstep to cause the worker to fail */
55    public static final int FAULTING_SUPERSTEP = 4;
56    /** Vertex id to fault on */
57    public static final long FAULTING_VERTEX_ID = 1;
58    /** Dynamically set number of supersteps */
59    public static final String SUPERSTEP_COUNT =
60        "simpleCheckpointVertex.superstepCount";
61    /** Should fault? */
62    public static final String ENABLE_FAULT =
63        "simpleCheckpointVertex.enableFault";
64    /** Class logger */
65    private static final Logger LOG =
66        Logger.getLogger(SimpleCheckpoint.class);
67    /** Configuration */
68    private Configuration conf;
69  
70    /**
71     * Actual computation.
72     */
73    public static class SimpleCheckpointComputation extends
74        BasicComputation<LongWritable, IntWritable, FloatWritable,
75            FloatWritable> {
76      @Override
77      public void compute(
78          Vertex<LongWritable, IntWritable, FloatWritable> vertex,
79          Iterable<FloatWritable> messages) throws IOException {
80        SimpleCheckpointVertexWorkerContext workerContext = getWorkerContext();
81  
82        boolean enableFault = workerContext.getEnableFault();
83        int supersteps = workerContext.getSupersteps();
84  
85        if (enableFault && (getSuperstep() == FAULTING_SUPERSTEP) &&
86            (getContext().getTaskAttemptID().getId() == 0) &&
87            (vertex.getId().get() == FAULTING_VERTEX_ID)) {
88          LOG.info("compute: Forced a fault on the first " +
89              "attempt of superstep " +
90              FAULTING_SUPERSTEP + " and vertex id " +
91              FAULTING_VERTEX_ID);
92          System.exit(-1);
93        }
94        if (getSuperstep() > supersteps) {
95          vertex.voteToHalt();
96          return;
97        }
98        long sumAgg = this.<LongWritable>getAggregatedValue(
99            LongSumAggregator.class.getName()).get();
100       LOG.info("compute: " + sumAgg);
101       aggregate(LongSumAggregator.class.getName(),
102           new LongWritable(vertex.getId().get()));
103       LOG.info("compute: sum = " + sumAgg +
104           " for vertex " + vertex.getId());
105       float msgValue = 0.0f;
106       for (FloatWritable message : messages) {
107         float curMsgValue = message.get();
108         msgValue += curMsgValue;
109         LOG.info("compute: got msgValue = " + curMsgValue +
110             " for vertex " + vertex.getId() +
111             " on superstep " + getSuperstep());
112       }
113       int vertexValue = vertex.getValue().get();
114       vertex.setValue(new IntWritable(vertexValue + (int) msgValue));
115       LOG.info("compute: vertex " + vertex.getId() +
116           " has value " + vertex.getValue() +
117           " on superstep " + getSuperstep());
118       for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
119         FloatWritable newEdgeValue = new FloatWritable(edge.getValue().get() +
120             (float) vertexValue);
121         Edge<LongWritable, FloatWritable> newEdge =
122             EdgeFactory.create(edge.getTargetVertexId(), newEdgeValue);
123         LOG.info("compute: vertex " + vertex.getId() +
124             " sending edgeValue " + edge.getValue() +
125             " vertexValue " + vertexValue +
126             " total " + newEdgeValue +
127             " to vertex " + edge.getTargetVertexId() +
128             " on superstep " + getSuperstep());
129         vertex.addEdge(newEdge);
130         sendMessage(edge.getTargetVertexId(), newEdgeValue);
131       }
132     }
133   }
134 
135   /**
136    * Worker context associated with {@link SimpleCheckpoint}.
137    */
138   public static class SimpleCheckpointVertexWorkerContext
139       extends WorkerContext {
140     /** Filename to indicate whether a fault was found */
141     public static final String FAULT_FILE = "/tmp/faultFile";
142     /** User can access this after the application finishes if local */
143     private static long FINAL_SUM;
144     /** Number of supersteps to run (6 by default) */
145     private int supersteps = 6;
146     /** Enable the fault at the particular vertex id and superstep? */
147     private boolean enableFault = false;
148 
149     public static long getFinalSum() {
150       return FINAL_SUM;
151     }
152 
153     @Override
154     public void preApplication()
155       throws InstantiationException, IllegalAccessException {
156       supersteps = getContext().getConfiguration()
157           .getInt(SUPERSTEP_COUNT, supersteps);
158       enableFault = getContext().getConfiguration()
159           .getBoolean(ENABLE_FAULT, false);
160     }
161 
162     @Override
163     public void postApplication() {
164       setFinalSum(this.<LongWritable>getAggregatedValue(
165           LongSumAggregator.class.getName()).get());
166       LOG.info("FINAL_SUM=" + FINAL_SUM);
167     }
168 
169     /**
170      * Set the final sum
171      *
172      * @param value sum
173      */
174     private static void setFinalSum(long value) {
175       FINAL_SUM = value;
176     }
177 
178     @Override
179     public void preSuperstep() {
180     }
181 
182     @Override
183     public void postSuperstep() { }
184 
185     public int getSupersteps() {
186       return this.supersteps;
187     }
188 
189     public boolean getEnableFault() {
190       return this.enableFault;
191     }
192   }
193 
194   @Override
195   public int run(String[] args) throws Exception {
196     Options options = new Options();
197     options.addOption("h", "help", false, "Help");
198     options.addOption("v", "verbose", false, "Verbose");
199     options.addOption("w",
200         "workers",
201         true,
202         "Number of workers");
203     options.addOption("s",
204         "supersteps",
205         true,
206         "Supersteps to execute before finishing");
207     options.addOption("w",
208         "workers",
209         true,
210         "Minimum number of workers");
211     options.addOption("o",
212         "outputDirectory",
213         true,
214         "Output directory");
215     HelpFormatter formatter = new HelpFormatter();
216     if (args.length == 0) {
217       formatter.printHelp(getClass().getName(), options, true);
218       return 0;
219     }
220     CommandLineParser parser = new PosixParser();
221     CommandLine cmd = parser.parse(options, args);
222     if (cmd.hasOption('h')) {
223       formatter.printHelp(getClass().getName(), options, true);
224       return 0;
225     }
226     if (!cmd.hasOption('w')) {
227       LOG.info("Need to choose the number of workers (-w)");
228       return -1;
229     }
230     if (!cmd.hasOption('o')) {
231       LOG.info("Need to set the output directory (-o)");
232       return -1;
233     }
234 
235     GiraphJob bspJob = new GiraphJob(getConf(), getClass().getName());
236     bspJob.getConfiguration().setComputationClass(
237         SimpleCheckpointComputation.class);
238     bspJob.getConfiguration().setVertexInputFormatClass(
239         GeneratedVertexInputFormat.class);
240     bspJob.getConfiguration().setVertexOutputFormatClass(
241         IdWithValueTextOutputFormat.class);
242     bspJob.getConfiguration().setWorkerContextClass(
243         SimpleCheckpointVertexWorkerContext.class);
244     bspJob.getConfiguration().setMasterComputeClass(
245         SimpleCheckpointVertexMasterCompute.class);
246     int minWorkers = Integer.parseInt(cmd.getOptionValue('w'));
247     int maxWorkers = Integer.parseInt(cmd.getOptionValue('w'));
248     bspJob.getConfiguration().setWorkerConfiguration(
249         minWorkers, maxWorkers, 100.0f);
250 
251     FileOutputFormat.setOutputPath(bspJob.getInternalJob(),
252                                    new Path(cmd.getOptionValue('o')));
253     boolean verbose = false;
254     if (cmd.hasOption('v')) {
255       verbose = true;
256     }
257     if (cmd.hasOption('s')) {
258       getConf().setInt(SUPERSTEP_COUNT,
259           Integer.parseInt(cmd.getOptionValue('s')));
260     }
261     if (bspJob.run(verbose)) {
262       return 0;
263     } else {
264       return -1;
265     }
266   }
267 
268   /**
269    * Master compute associated with {@link SimpleCheckpoint}.
270    * It registers required aggregators.
271    */
272   public static class SimpleCheckpointVertexMasterCompute extends
273       DefaultMasterCompute {
274     @Override
275     public void initialize() throws InstantiationException,
276         IllegalAccessException {
277       registerAggregator(LongSumAggregator.class.getName(),
278           LongSumAggregator.class);
279     }
280   }
281 
282   /**
283    * Executable from the command line.
284    *
285    * @param args Command line args.
286    * @throws Exception
287    */
288   public static void main(String[] args) throws Exception {
289     System.exit(ToolRunner.run(new SimpleCheckpoint(), args));
290   }
291 
292   @Override
293   public Configuration getConf() {
294     return conf;
295   }
296 
297   @Override
298   public void setConf(Configuration conf) {
299     this.conf = conf;
300   }
301 }