This project has retired. For details please refer to its
Attic page.
PageRankWithKryoSimpleWritable xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.examples;
20
21 import com.google.common.collect.Lists;
22 import org.apache.giraph.aggregators.BasicAggregator;
23 import org.apache.giraph.aggregators.LongSumAggregator;
24 import org.apache.giraph.edge.Edge;
25 import org.apache.giraph.edge.EdgeFactory;
26 import org.apache.giraph.graph.BasicComputation;
27 import org.apache.giraph.graph.Vertex;
28 import org.apache.giraph.io.VertexReader;
29 import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
30 import org.apache.giraph.master.DefaultMasterCompute;
31 import org.apache.giraph.worker.WorkerContext;
32 import org.apache.giraph.writable.kryo.KryoSimpleWritable;
33 import org.apache.hadoop.io.LongWritable;
34 import org.apache.hadoop.mapreduce.InputSplit;
35 import org.apache.hadoop.mapreduce.TaskAttemptContext;
36 import org.apache.log4j.Logger;
37
38 import java.io.IOException;
39 import java.util.ArrayList;
40 import java.util.List;
41 import org.apache.giraph.examples.PageRankWithKryoSimpleWritable.VertexValue;
42 import org.apache.giraph.examples.PageRankWithKryoSimpleWritable.MessageValue;
43 import org.apache.giraph.examples.PageRankWithKryoSimpleWritable.EdgeValue;
44
45
46
47
48
49 @Algorithm(
50 name = "Page rank"
51 )
52 public class PageRankWithKryoSimpleWritable extends
53 BasicComputation<LongWritable, VertexValue,
54 EdgeValue, MessageValue> {
55
56 public static final int MAX_SUPERSTEPS = 30;
57
58
59 private static final Logger LOG =
60 Logger.getLogger(PageRankWithKryoSimpleWritable.class);
61
62 private static String SUM_AGG = "sum";
63
64 private static String MIN_AGG = "min";
65
66 private static String MAX_AGG = "max";
67
68 @Override
69 public void compute(
70 Vertex<LongWritable, VertexValue,
71 EdgeValue> vertex,
72 Iterable<MessageValue> messages) throws IOException {
73 if (getSuperstep() >= 1) {
74 double sum = 0;
75 for (MessageValue message : messages) {
76 sum += message.get();
77 }
78 Double value = (0.15f / getTotalNumVertices()) + 0.85f * sum;
79 VertexValue vertexValue = new VertexValue(value);
80 vertex.setValue(vertexValue);
81 aggregate(MAX_AGG, vertexValue);
82 aggregate(MIN_AGG, vertexValue);
83 aggregate(SUM_AGG, new LongWritable(1));
84 LOG.info(vertex.getId() + ": PageRank=" + vertexValue +
85 " max=" + getAggregatedValue(MAX_AGG) +
86 " min=" + getAggregatedValue(MIN_AGG));
87 }
88
89 if (getSuperstep() < MAX_SUPERSTEPS) {
90 long edges = vertex.getNumEdges();
91 sendMessageToAllEdges(vertex,
92 new MessageValue(vertex.getValue().get() / edges));
93 } else {
94 vertex.voteToHalt();
95 }
96 }
97
98
99
100
101 public static class PageRankWithKryoWorkerContext extends
102 WorkerContext {
103
104 private static double FINAL_MAX;
105
106 private static double FINAL_MIN;
107
108 private static long FINAL_SUM;
109
110 public static double getFinalMax() {
111 return FINAL_MAX;
112 }
113
114 public static double getFinalMin() {
115 return FINAL_MIN;
116 }
117
118 public static long getFinalSum() {
119 return FINAL_SUM;
120 }
121
122 @Override
123 public void preApplication()
124 throws InstantiationException, IllegalAccessException {
125 }
126
127 @Override
128 public void postApplication() {
129 FINAL_SUM = this.<LongWritable>getAggregatedValue(SUM_AGG).get();
130 FINAL_MAX = this.<VertexValue>getAggregatedValue(MAX_AGG).get();
131 FINAL_MIN = this.<VertexValue>getAggregatedValue(MIN_AGG).get();
132
133 LOG.info("aggregatedNumVertices=" + FINAL_SUM);
134 LOG.info("aggregatedMaxPageRank=" + FINAL_MAX);
135 LOG.info("aggregatedMinPageRank=" + FINAL_MIN);
136 }
137
138 @Override
139 public void preSuperstep() {
140 if (getSuperstep() >= 3) {
141 LOG.info("aggregatedNumVertices=" +
142 getAggregatedValue(SUM_AGG) +
143 " NumVertices=" + getTotalNumVertices());
144 if (this.<LongWritable>getAggregatedValue(SUM_AGG).get() !=
145 getTotalNumVertices()) {
146 throw new RuntimeException("wrong value of SumAggreg: " +
147 getAggregatedValue(SUM_AGG) + ", should be: " +
148 getTotalNumVertices());
149 }
150 VertexValue maxPagerank = getAggregatedValue(MAX_AGG);
151 LOG.info("aggregatedMaxPageRank=" + maxPagerank.get());
152 VertexValue minPagerank = getAggregatedValue(MIN_AGG);
153 LOG.info("aggregatedMinPageRank=" + minPagerank.get());
154 }
155 }
156
157 @Override
158 public void postSuperstep() { }
159 }
160
161
162
163
164
165 public static class PageRankWithKryoMasterCompute extends
166 DefaultMasterCompute {
167 @Override
168 public void initialize() throws InstantiationException,
169 IllegalAccessException {
170 registerAggregator(SUM_AGG, LongSumAggregator.class);
171 registerPersistentAggregator(MIN_AGG, DoubleMinWrapperAggregator.class);
172 registerPersistentAggregator(MAX_AGG, DoubleMaxWrapperAggregator.class);
173 }
174 }
175
176
177
178
179 public static class PageRankWithKryoVertexReader extends
180 GeneratedVertexReader<LongWritable, VertexValue, EdgeValue> {
181
182 private static final Logger LOG =
183 Logger.getLogger(
184 PageRankWithKryoSimpleWritable.PageRankWithKryoVertexReader.class);
185
186 @Override
187 public boolean nextVertex() {
188 return totalRecords > recordsRead;
189 }
190
191 @Override
192 public Vertex<LongWritable, VertexValue, EdgeValue>
193 getCurrentVertex() throws IOException {
194 Vertex<LongWritable, VertexValue, EdgeValue> vertex =
195 getConf().createVertex();
196 LongWritable vertexId = new LongWritable(
197 (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
198 VertexValue vertexValue = new VertexValue(vertexId.get() * 10d);
199 long targetVertexId =
200 (vertexId.get() + 1) %
201 (inputSplit.getNumSplits() * totalRecords);
202 float edgeValue = vertexId.get() * 100f;
203 List<Edge<LongWritable, EdgeValue>> edges = Lists.newLinkedList();
204 edges.add(EdgeFactory.create(new LongWritable(targetVertexId),
205 new EdgeValue(edgeValue)));
206 vertex.initialize(vertexId, vertexValue, edges);
207 ++recordsRead;
208 if (LOG.isInfoEnabled()) {
209 LOG.info("next: Return vertexId=" + vertex.getId().get() +
210 ", vertexValue=" + vertex.getValue() +
211 ", targetVertexId=" + targetVertexId + ", edgeValue=" + edgeValue);
212 }
213 return vertex;
214 }
215 }
216
217
218
219
220 public static class PageRankWithKryoVertexInputFormat extends
221 GeneratedVertexInputFormat<LongWritable, VertexValue, EdgeValue> {
222 @Override
223 public VertexReader<LongWritable, VertexValue,
224 EdgeValue> createVertexReader(InputSplit split,
225 TaskAttemptContext context)
226 throws IOException {
227 return new PageRankWithKryoVertexReader();
228 }
229 }
230
231
232
233
234
235
236
237
238
239 public static class VertexValue extends KryoSimpleWritable {
240
241
242
243 private double[] ranks;
244
245
246 public VertexValue() {
247 }
248
249
250
251
252
253 public VertexValue(Double val) {
254 ranks = new double[1];
255
256 ranks[0] = val;
257 }
258
259
260
261
262
263 public Double get() {
264 return ranks[0];
265 }
266
267
268
269
270
271 public void set(Double val) {
272 this.ranks[0] = val;
273 }
274 }
275
276
277
278
279
280
281
282
283
284 public static class EdgeValue extends KryoSimpleWritable {
285
286 private Float realValue;
287
288
289 public EdgeValue() {
290 }
291
292
293
294
295 public EdgeValue(Float val) {
296 realValue = val;
297 }
298
299
300
301
302
303 public Float get() {
304 return realValue;
305 }
306
307
308
309
310
311 public void set(Float val) {
312 this.realValue = val;
313 }
314 }
315
316
317
318
319
320
321
322
323
324 public static class MessageValue extends KryoSimpleWritable {
325
326 private List<Double> msgValue;
327
328
329 public MessageValue() {
330 }
331
332
333
334
335
336 public MessageValue(Double val) {
337 msgValue = new ArrayList<>();
338 msgValue.add(val);
339 }
340
341
342
343
344
345 public Double get() {
346 return msgValue.get(0);
347 }
348
349
350
351
352
353 public void set(Double val) {
354 this.msgValue.set(0, val);
355 }
356 }
357
358
359
360
361
362 public static class DoubleMaxWrapperAggregator extends
363 BasicAggregator<VertexValue> {
364 @Override
365 public void aggregate(VertexValue value) {
366 getAggregatedValue().set(
367 Math.max(getAggregatedValue().get(), value.get()));
368 }
369
370 @Override
371 public VertexValue createInitialValue() {
372 return new VertexValue(Double.NEGATIVE_INFINITY);
373 }
374 }
375
376
377
378
379 public static class DoubleMinWrapperAggregator
380 extends BasicAggregator<VertexValue> {
381 @Override
382 public void aggregate(VertexValue value) {
383 getAggregatedValue().set(
384 Math.min(getAggregatedValue().get(), value.get()));
385 }
386
387 @Override
388 public VertexValue createInitialValue() {
389 return new VertexValue(Double.MAX_VALUE);
390 }
391 }
392
393 }