This project has retired. For details please refer to its
Attic page.
SimpleVertexWithWorkerContext xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.examples;
20
21 import org.apache.giraph.graph.BasicComputation;
22 import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat;
23 import org.apache.giraph.graph.Vertex;
24 import org.apache.giraph.job.GiraphJob;
25 import org.apache.giraph.worker.WorkerContext;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.fs.FileSystem;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.io.DoubleWritable;
30 import org.apache.hadoop.io.FloatWritable;
31 import org.apache.hadoop.io.IntWritable;
32 import org.apache.hadoop.io.LongWritable;
33 import org.apache.hadoop.mapreduce.Mapper.Context;
34 import org.apache.hadoop.util.Tool;
35 import org.apache.hadoop.util.ToolRunner;
36
37 import java.io.DataOutputStream;
38 import java.io.IOException;
39
40
41
42
43
44
45 public class SimpleVertexWithWorkerContext implements Tool {
46
47 public static final String OUTPUTDIR = "svwwc.outputdir";
48
49 private static final int TESTLENGTH = 30;
50
51 private Configuration conf;
52
53 @Override
54 public void setConf(Configuration conf) {
55 this.conf = conf;
56 }
57
58 @Override
59 public Configuration getConf() {
60 return conf;
61 }
62
63
64
65
66 public static class SimpleComputation extends BasicComputation<LongWritable,
67 IntWritable, FloatWritable, DoubleWritable> {
68 @Override
69 public void compute(
70 Vertex<LongWritable, IntWritable, FloatWritable> vertex,
71 Iterable<DoubleWritable> messages) throws IOException {
72
73 long superstep = getSuperstep();
74
75 if (superstep < TESTLENGTH) {
76 EmitterWorkerContext emitter = getWorkerContext();
77 emitter.emit("vertexId=" + vertex.getId() +
78 " superstep=" + superstep + "\n");
79 } else {
80 vertex.voteToHalt();
81 }
82 }
83 }
84
85
86
87
88 @SuppressWarnings("rawtypes")
89 public static class EmitterWorkerContext extends WorkerContext {
90
91 private static final String FILENAME = "emitter_";
92
93 private DataOutputStream out;
94
95 @Override
96 public void preApplication() {
97 Context context = getContext();
98 FileSystem fs;
99
100 try {
101 fs = FileSystem.get(context.getConfiguration());
102
103 String p = context.getConfiguration()
104 .get(SimpleVertexWithWorkerContext.OUTPUTDIR);
105 if (p == null) {
106 throw new IllegalArgumentException(
107 SimpleVertexWithWorkerContext.OUTPUTDIR +
108 " undefined!");
109 }
110
111 Path path = new Path(p);
112 if (!fs.exists(path)) {
113 throw new IllegalArgumentException(path +
114 " doesn't exist");
115 }
116
117 Path outF = new Path(path, FILENAME +
118 context.getTaskAttemptID());
119 if (fs.exists(outF)) {
120 throw new IllegalArgumentException(outF +
121 " aready exists");
122 }
123
124 out = fs.create(outF);
125 } catch (IOException e) {
126 throw new RuntimeException(
127 "can't initialize WorkerContext", e);
128 }
129 }
130
131 @Override
132 public void postApplication() {
133 if (out != null) {
134 try {
135 out.flush();
136 out.close();
137 } catch (IOException e) {
138 throw new RuntimeException(
139 "can't finalize WorkerContext", e);
140 }
141 out = null;
142 }
143 }
144
145 @Override
146 public void preSuperstep() { }
147
148 @Override
149 public void postSuperstep() { }
150
151
152
153
154
155
156 public void emit(String s) {
157 try {
158 out.writeUTF(s);
159 } catch (IOException e) {
160 throw new RuntimeException("can't emit", e);
161 }
162 }
163 }
164
165 @Override
166 public int run(String[] args) throws Exception {
167 if (args.length != 2) {
168 throw new IllegalArgumentException(
169 "run: Must have 2 arguments <output path> <# of workers>");
170 }
171 GiraphJob job = new GiraphJob(getConf(), getClass().getName());
172 job.getConfiguration().setComputationClass(SimpleComputation.class);
173 job.getConfiguration().setVertexInputFormatClass(
174 SimpleSuperstepVertexInputFormat.class);
175 job.getConfiguration().setWorkerContextClass(EmitterWorkerContext.class);
176 job.getConfiguration().set(
177 SimpleVertexWithWorkerContext.OUTPUTDIR, args[0]);
178 job.getConfiguration().setWorkerConfiguration(Integer.parseInt(args[1]),
179 Integer.parseInt(args[1]),
180 100.0f);
181 if (job.run(true)) {
182 return 0;
183 } else {
184 return -1;
185 }
186 }
187
188
189
190
191
192
193
194 public static void main(String[] args) throws Exception {
195 System.exit(ToolRunner.run(new SimpleVertexWithWorkerContext(), args));
196 }
197 }