This project has retired. For details please refer to its
Attic page.
SimpleCheckpoint xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.examples;
20
21 import org.apache.commons.cli.CommandLine;
22 import org.apache.commons.cli.CommandLineParser;
23 import org.apache.commons.cli.HelpFormatter;
24 import org.apache.commons.cli.Options;
25 import org.apache.commons.cli.PosixParser;
26 import org.apache.giraph.aggregators.LongSumAggregator;
27 import org.apache.giraph.graph.BasicComputation;
28 import org.apache.giraph.edge.Edge;
29 import org.apache.giraph.edge.EdgeFactory;
30 import org.apache.giraph.io.formats.FileOutputFormatUtil;
31 import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
32 import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
33 import org.apache.giraph.job.GiraphJob;
34 import org.apache.giraph.master.DefaultMasterCompute;
35 import org.apache.giraph.graph.Vertex;
36 import org.apache.giraph.worker.WorkerContext;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.io.FloatWritable;
40 import org.apache.hadoop.io.IntWritable;
41 import org.apache.hadoop.io.LongWritable;
42 import org.apache.hadoop.util.Tool;
43 import org.apache.hadoop.util.ToolRunner;
44 import org.apache.log4j.Logger;
45
46 import java.io.IOException;
47
48
49
50
51
52
53 public class SimpleCheckpoint implements Tool {
54
55 public static final int FAULTING_SUPERSTEP = 4;
56
57 public static final long FAULTING_VERTEX_ID = 1;
58
59 public static final String SUPERSTEP_COUNT =
60 "simpleCheckpointVertex.superstepCount";
61
62 public static final String ENABLE_FAULT =
63 "simpleCheckpointVertex.enableFault";
64
65 private static final Logger LOG =
66 Logger.getLogger(SimpleCheckpoint.class);
67
68 private Configuration conf;
69
70
71
72
73 public static class SimpleCheckpointComputation extends
74 BasicComputation<LongWritable, IntWritable, FloatWritable,
75 FloatWritable> {
76 @Override
77 public void compute(
78 Vertex<LongWritable, IntWritable, FloatWritable> vertex,
79 Iterable<FloatWritable> messages) throws IOException {
80 SimpleCheckpointVertexWorkerContext workerContext = getWorkerContext();
81
82 boolean enableFault = workerContext.getEnableFault();
83 int supersteps = workerContext.getSupersteps();
84
85 if (enableFault && (getSuperstep() == FAULTING_SUPERSTEP) &&
86 (getContext().getTaskAttemptID().getId() == 0) &&
87 (vertex.getId().get() == FAULTING_VERTEX_ID)) {
88 LOG.info("compute: Forced a fault on the first " +
89 "attempt of superstep " +
90 FAULTING_SUPERSTEP + " and vertex id " +
91 FAULTING_VERTEX_ID);
92 System.exit(-1);
93 }
94 if (getSuperstep() > supersteps) {
95 vertex.voteToHalt();
96 return;
97 }
98 long sumAgg = this.<LongWritable>getAggregatedValue(
99 LongSumAggregator.class.getName()).get();
100 LOG.info("compute: " + sumAgg);
101 aggregate(LongSumAggregator.class.getName(),
102 new LongWritable(vertex.getId().get()));
103 LOG.info("compute: sum = " + sumAgg +
104 " for vertex " + vertex.getId());
105 float msgValue = 0.0f;
106 for (FloatWritable message : messages) {
107 float curMsgValue = message.get();
108 msgValue += curMsgValue;
109 LOG.info("compute: got msgValue = " + curMsgValue +
110 " for vertex " + vertex.getId() +
111 " on superstep " + getSuperstep());
112 }
113 int vertexValue = vertex.getValue().get();
114 vertex.setValue(new IntWritable(vertexValue + (int) msgValue));
115 LOG.info("compute: vertex " + vertex.getId() +
116 " has value " + vertex.getValue() +
117 " on superstep " + getSuperstep());
118 for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
119 FloatWritable newEdgeValue = new FloatWritable(edge.getValue().get() +
120 (float) vertexValue);
121 Edge<LongWritable, FloatWritable> newEdge =
122 EdgeFactory.create(edge.getTargetVertexId(), newEdgeValue);
123 LOG.info("compute: vertex " + vertex.getId() +
124 " sending edgeValue " + edge.getValue() +
125 " vertexValue " + vertexValue +
126 " total " + newEdgeValue +
127 " to vertex " + edge.getTargetVertexId() +
128 " on superstep " + getSuperstep());
129 vertex.addEdge(newEdge);
130 sendMessage(edge.getTargetVertexId(), newEdgeValue);
131 }
132 }
133 }
134
135
136
137
138 public static class SimpleCheckpointVertexWorkerContext
139 extends WorkerContext {
140
141 public static final String FAULT_FILE = "/tmp/faultFile";
142
143 private static long FINAL_SUM;
144
145 private int supersteps = 6;
146
147 private boolean enableFault = false;
148
149 public static long getFinalSum() {
150 return FINAL_SUM;
151 }
152
153 @Override
154 public void preApplication()
155 throws InstantiationException, IllegalAccessException {
156 supersteps = getContext().getConfiguration()
157 .getInt(SUPERSTEP_COUNT, supersteps);
158 enableFault = getContext().getConfiguration()
159 .getBoolean(ENABLE_FAULT, false);
160 }
161
162 @Override
163 public void postApplication() {
164 setFinalSum(this.<LongWritable>getAggregatedValue(
165 LongSumAggregator.class.getName()).get());
166 LOG.info("FINAL_SUM=" + FINAL_SUM);
167 }
168
169
170
171
172
173
174 private static void setFinalSum(long value) {
175 FINAL_SUM = value;
176 }
177
178 @Override
179 public void preSuperstep() {
180 }
181
182 @Override
183 public void postSuperstep() { }
184
185 public int getSupersteps() {
186 return this.supersteps;
187 }
188
189 public boolean getEnableFault() {
190 return this.enableFault;
191 }
192 }
193
194 @Override
195 public int run(String[] args) throws Exception {
196 Options options = new Options();
197 options.addOption("h", "help", false, "Help");
198 options.addOption("v", "verbose", false, "Verbose");
199 options.addOption("w",
200 "workers",
201 true,
202 "Number of workers");
203 options.addOption("s",
204 "supersteps",
205 true,
206 "Supersteps to execute before finishing");
207 options.addOption("w",
208 "workers",
209 true,
210 "Minimum number of workers");
211 options.addOption("o",
212 "outputDirectory",
213 true,
214 "Output directory");
215 HelpFormatter formatter = new HelpFormatter();
216 if (args.length == 0) {
217 formatter.printHelp(getClass().getName(), options, true);
218 return 0;
219 }
220 CommandLineParser parser = new PosixParser();
221 CommandLine cmd = parser.parse(options, args);
222 if (cmd.hasOption('h')) {
223 formatter.printHelp(getClass().getName(), options, true);
224 return 0;
225 }
226 if (!cmd.hasOption('w')) {
227 LOG.info("Need to choose the number of workers (-w)");
228 return -1;
229 }
230 if (!cmd.hasOption('o')) {
231 LOG.info("Need to set the output directory (-o)");
232 return -1;
233 }
234
235 GiraphJob bspJob = new GiraphJob(getConf(), getClass().getName());
236 bspJob.getConfiguration().setComputationClass(
237 SimpleCheckpointComputation.class);
238 bspJob.getConfiguration().setVertexInputFormatClass(
239 GeneratedVertexInputFormat.class);
240 bspJob.getConfiguration().setVertexOutputFormatClass(
241 IdWithValueTextOutputFormat.class);
242 bspJob.getConfiguration().setWorkerContextClass(
243 SimpleCheckpointVertexWorkerContext.class);
244 bspJob.getConfiguration().setMasterComputeClass(
245 SimpleCheckpointVertexMasterCompute.class);
246 int minWorkers = Integer.parseInt(cmd.getOptionValue('w'));
247 int maxWorkers = Integer.parseInt(cmd.getOptionValue('w'));
248 bspJob.getConfiguration().setWorkerConfiguration(
249 minWorkers, maxWorkers, 100.0f);
250
251 FileOutputFormatUtil.setOutputPath(bspJob.getInternalJob(),
252 new Path(cmd.getOptionValue('o')));
253 boolean verbose = false;
254 if (cmd.hasOption('v')) {
255 verbose = true;
256 }
257 if (cmd.hasOption('s')) {
258 getConf().setInt(SUPERSTEP_COUNT,
259 Integer.parseInt(cmd.getOptionValue('s')));
260 }
261 if (bspJob.run(verbose)) {
262 return 0;
263 } else {
264 return -1;
265 }
266 }
267
268
269
270
271
272 public static class SimpleCheckpointVertexMasterCompute extends
273 DefaultMasterCompute {
274 @Override
275 public void initialize() throws InstantiationException,
276 IllegalAccessException {
277 registerAggregator(LongSumAggregator.class.getName(),
278 LongSumAggregator.class);
279 }
280 }
281
282
283
284
285
286
287
288 public static void main(String[] args) throws Exception {
289 System.exit(ToolRunner.run(new SimpleCheckpoint(), args));
290 }
291
292 @Override
293 public Configuration getConf() {
294 return conf;
295 }
296
297 @Override
298 public void setConf(Configuration conf) {
299 this.conf = conf;
300 }
301 }