This project has retired. For details please refer to its Attic page.
SimpleVertexWithWorkerContext xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.examples;
20  
21  import org.apache.giraph.graph.BasicComputation;
22  import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat;
23  import org.apache.giraph.graph.Vertex;
24  import org.apache.giraph.job.GiraphJob;
25  import org.apache.giraph.worker.WorkerContext;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.fs.FileSystem;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.io.DoubleWritable;
30  import org.apache.hadoop.io.FloatWritable;
31  import org.apache.hadoop.io.IntWritable;
32  import org.apache.hadoop.io.LongWritable;
33  import org.apache.hadoop.mapreduce.Mapper.Context;
34  import org.apache.hadoop.util.Tool;
35  import org.apache.hadoop.util.ToolRunner;
36  
37  import java.io.DataOutputStream;
38  import java.io.IOException;
39  
40  /**
41   * Fully runnable example of how to
42   * emit worker data to HDFS during a graph
43   * computation.
44   */
45  public class SimpleVertexWithWorkerContext implements Tool {
46    /** Directory name of where to write. */
47    public static final String OUTPUTDIR = "svwwc.outputdir";
48    /** Halting condition for the number of supersteps */
49    private static final int TESTLENGTH = 30;
50    /** Configuration */
51    private Configuration conf;
52  
53    @Override
54    public void setConf(Configuration conf) {
55      this.conf = conf;
56    }
57  
58    @Override
59    public Configuration getConf() {
60      return conf;
61    }
62  
63    /**
64     * Actual vetex implementation
65     */
66    public static class SimpleComputation extends BasicComputation<LongWritable,
67        IntWritable, FloatWritable, DoubleWritable> {
68      @Override
69      public void compute(
70          Vertex<LongWritable, IntWritable, FloatWritable> vertex,
71          Iterable<DoubleWritable> messages) throws IOException {
72  
73        long superstep = getSuperstep();
74  
75        if (superstep < TESTLENGTH) {
76          EmitterWorkerContext emitter = getWorkerContext();
77          emitter.emit("vertexId=" + vertex.getId() +
78              " superstep=" + superstep + "\n");
79        } else {
80          vertex.voteToHalt();
81        }
82      }
83    }
84  
85    /**
86     * Example worker context to emit data as part of a superstep.
87     */
88    @SuppressWarnings("rawtypes")
89    public static class EmitterWorkerContext extends WorkerContext {
90      /** File name prefix */
91      private static final String FILENAME = "emitter_";
92      /** Output stream to dump the strings. */
93      private DataOutputStream out;
94  
95      @Override
96      public void preApplication() {
97        Context context = getContext();
98        FileSystem fs;
99  
100       try {
101         fs = FileSystem.get(context.getConfiguration());
102 
103         String p = context.getConfiguration()
104             .get(SimpleVertexWithWorkerContext.OUTPUTDIR);
105         if (p == null) {
106           throw new IllegalArgumentException(
107               SimpleVertexWithWorkerContext.OUTPUTDIR +
108               " undefined!");
109         }
110 
111         Path path = new Path(p);
112         if (!fs.exists(path)) {
113           throw new IllegalArgumentException(path +
114               " doesn't exist");
115         }
116 
117         Path outF = new Path(path, FILENAME +
118             context.getTaskAttemptID());
119         if (fs.exists(outF)) {
120           throw new IllegalArgumentException(outF +
121               " aready exists");
122         }
123 
124         out = fs.create(outF);
125       } catch (IOException e) {
126         throw new RuntimeException(
127             "can't initialize WorkerContext", e);
128       }
129     }
130 
131     @Override
132     public void postApplication() {
133       if (out != null) {
134         try {
135           out.flush();
136           out.close();
137         } catch (IOException e) {
138           throw new RuntimeException(
139               "can't finalize WorkerContext", e);
140         }
141         out = null;
142       }
143     }
144 
145     @Override
146     public void preSuperstep() { }
147 
148     @Override
149     public void postSuperstep() { }
150 
151     /**
152      * Write this string to the output stream.
153      *
154      * @param s String to dump.
155      */
156     public void emit(String s) {
157       try {
158         out.writeUTF(s);
159       } catch (IOException e) {
160         throw new RuntimeException("can't emit", e);
161       }
162     }
163   }
164 
165   @Override
166   public int run(String[] args) throws Exception {
167     if (args.length != 2) {
168       throw new IllegalArgumentException(
169           "run: Must have 2 arguments <output path> <# of workers>");
170     }
171     GiraphJob job = new GiraphJob(getConf(), getClass().getName());
172     job.getConfiguration().setComputationClass(SimpleComputation.class);
173     job.getConfiguration().setVertexInputFormatClass(
174         SimpleSuperstepVertexInputFormat.class);
175     job.getConfiguration().setWorkerContextClass(EmitterWorkerContext.class);
176     job.getConfiguration().set(
177         SimpleVertexWithWorkerContext.OUTPUTDIR, args[0]);
178     job.getConfiguration().setWorkerConfiguration(Integer.parseInt(args[1]),
179         Integer.parseInt(args[1]),
180         100.0f);
181     if (job.run(true)) {
182       return 0;
183     } else {
184       return -1;
185     }
186   }
187 
188   /**
189    * Executable from the command line.
190    *
191    * @param args Command line arguments.
192    * @throws Exception
193    */
194   public static void main(String[] args) throws Exception {
195     System.exit(ToolRunner.run(new SimpleVertexWithWorkerContext(), args));
196   }
197 }