This project has retired. For details please refer to its
Attic page.
SimplePageRankComputation xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.examples;
20
21 import org.apache.giraph.aggregators.DoubleMaxAggregator;
22 import org.apache.giraph.aggregators.DoubleMinAggregator;
23 import org.apache.giraph.aggregators.LongSumAggregator;
24 import org.apache.giraph.edge.Edge;
25 import org.apache.giraph.edge.EdgeFactory;
26 import org.apache.giraph.graph.BasicComputation;
27 import org.apache.giraph.graph.Vertex;
28 import org.apache.giraph.io.VertexReader;
29 import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
30 import org.apache.giraph.io.formats.TextVertexOutputFormat;
31 import org.apache.giraph.master.DefaultMasterCompute;
32 import org.apache.giraph.worker.WorkerContext;
33 import org.apache.hadoop.io.DoubleWritable;
34 import org.apache.hadoop.io.FloatWritable;
35 import org.apache.hadoop.io.LongWritable;
36 import org.apache.hadoop.io.Text;
37 import org.apache.hadoop.mapreduce.InputSplit;
38 import org.apache.hadoop.mapreduce.TaskAttemptContext;
39 import org.apache.log4j.Logger;
40
41 import com.google.common.collect.Lists;
42
43 import java.io.IOException;
44 import java.util.List;
45
46
47
48
49 @Algorithm(
50 name = "Page rank"
51 )
52 public class SimplePageRankComputation extends BasicComputation<LongWritable,
53 DoubleWritable, FloatWritable, DoubleWritable> {
54
55 public static final int MAX_SUPERSTEPS = 30;
56
57 private static final Logger LOG =
58 Logger.getLogger(SimplePageRankComputation.class);
59
60 private static String SUM_AGG = "sum";
61
62 private static String MIN_AGG = "min";
63
64 private static String MAX_AGG = "max";
65
66 @Override
67 public void compute(
68 Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
69 Iterable<DoubleWritable> messages) throws IOException {
70 if (getSuperstep() >= 1) {
71 double sum = 0;
72 for (DoubleWritable message : messages) {
73 sum += message.get();
74 }
75 DoubleWritable vertexValue =
76 new DoubleWritable((0.15f / getTotalNumVertices()) + 0.85f * sum);
77 vertex.setValue(vertexValue);
78 aggregate(MAX_AGG, vertexValue);
79 aggregate(MIN_AGG, vertexValue);
80 aggregate(SUM_AGG, new LongWritable(1));
81 LOG.info(vertex.getId() + ": PageRank=" + vertexValue +
82 " max=" + getAggregatedValue(MAX_AGG) +
83 " min=" + getAggregatedValue(MIN_AGG));
84 }
85
86 if (getSuperstep() < MAX_SUPERSTEPS) {
87 long edges = vertex.getNumEdges();
88 sendMessageToAllEdges(vertex,
89 new DoubleWritable(vertex.getValue().get() / edges));
90 } else {
91 vertex.voteToHalt();
92 }
93 }
94
95
96
97
98 public static class SimplePageRankWorkerContext extends
99 WorkerContext {
100
101 private static double FINAL_MAX;
102
103 private static double FINAL_MIN;
104
105 private static long FINAL_SUM;
106
107 public static double getFinalMax() {
108 return FINAL_MAX;
109 }
110
111 public static double getFinalMin() {
112 return FINAL_MIN;
113 }
114
115 public static long getFinalSum() {
116 return FINAL_SUM;
117 }
118
119 @Override
120 public void preApplication()
121 throws InstantiationException, IllegalAccessException {
122 }
123
124 @Override
125 public void postApplication() {
126 FINAL_SUM = this.<LongWritable>getAggregatedValue(SUM_AGG).get();
127 FINAL_MAX = this.<DoubleWritable>getAggregatedValue(MAX_AGG).get();
128 FINAL_MIN = this.<DoubleWritable>getAggregatedValue(MIN_AGG).get();
129
130 LOG.info("aggregatedNumVertices=" + FINAL_SUM);
131 LOG.info("aggregatedMaxPageRank=" + FINAL_MAX);
132 LOG.info("aggregatedMinPageRank=" + FINAL_MIN);
133 }
134
135 @Override
136 public void preSuperstep() {
137 if (getSuperstep() >= 3) {
138 LOG.info("aggregatedNumVertices=" +
139 getAggregatedValue(SUM_AGG) +
140 " NumVertices=" + getTotalNumVertices());
141 if (this.<LongWritable>getAggregatedValue(SUM_AGG).get() !=
142 getTotalNumVertices()) {
143 throw new RuntimeException("wrong value of SumAggreg: " +
144 getAggregatedValue(SUM_AGG) + ", should be: " +
145 getTotalNumVertices());
146 }
147 DoubleWritable maxPagerank = getAggregatedValue(MAX_AGG);
148 LOG.info("aggregatedMaxPageRank=" + maxPagerank.get());
149 DoubleWritable minPagerank = getAggregatedValue(MIN_AGG);
150 LOG.info("aggregatedMinPageRank=" + minPagerank.get());
151 }
152 }
153
154 @Override
155 public void postSuperstep() { }
156 }
157
158
159
160
161
162 public static class SimplePageRankMasterCompute extends
163 DefaultMasterCompute {
164 @Override
165 public void initialize() throws InstantiationException,
166 IllegalAccessException {
167 registerAggregator(SUM_AGG, LongSumAggregator.class);
168 registerPersistentAggregator(MIN_AGG, DoubleMinAggregator.class);
169 registerPersistentAggregator(MAX_AGG, DoubleMaxAggregator.class);
170 }
171 }
172
173
174
175
176 public static class SimplePageRankVertexReader extends
177 GeneratedVertexReader<LongWritable, DoubleWritable, FloatWritable> {
178
179 private static final Logger LOG =
180 Logger.getLogger(SimplePageRankVertexReader.class);
181
182 @Override
183 public boolean nextVertex() {
184 return totalRecords > recordsRead;
185 }
186
187 @Override
188 public Vertex<LongWritable, DoubleWritable, FloatWritable>
189 getCurrentVertex() throws IOException {
190 Vertex<LongWritable, DoubleWritable, FloatWritable> vertex =
191 getConf().createVertex();
192 LongWritable vertexId = new LongWritable(
193 (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
194 DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d);
195 long targetVertexId =
196 (vertexId.get() + 1) %
197 (inputSplit.getNumSplits() * totalRecords);
198 float edgeValue = vertexId.get() * 100f;
199 List<Edge<LongWritable, FloatWritable>> edges = Lists.newLinkedList();
200 edges.add(EdgeFactory.create(new LongWritable(targetVertexId),
201 new FloatWritable(edgeValue)));
202 vertex.initialize(vertexId, vertexValue, edges);
203 ++recordsRead;
204 if (LOG.isInfoEnabled()) {
205 LOG.info("next: Return vertexId=" + vertex.getId().get() +
206 ", vertexValue=" + vertex.getValue() +
207 ", targetVertexId=" + targetVertexId + ", edgeValue=" + edgeValue);
208 }
209 return vertex;
210 }
211 }
212
213
214
215
216 public static class SimplePageRankVertexInputFormat extends
217 GeneratedVertexInputFormat<LongWritable, DoubleWritable, FloatWritable> {
218 @Override
219 public VertexReader<LongWritable, DoubleWritable,
220 FloatWritable> createVertexReader(InputSplit split,
221 TaskAttemptContext context)
222 throws IOException {
223 return new SimplePageRankVertexReader();
224 }
225 }
226
227
228
229
230 public static class SimplePageRankVertexOutputFormat extends
231 TextVertexOutputFormat<LongWritable, DoubleWritable, FloatWritable> {
232 @Override
233 public TextVertexWriter createVertexWriter(TaskAttemptContext context)
234 throws IOException, InterruptedException {
235 return new SimplePageRankVertexWriter();
236 }
237
238
239
240
241 public class SimplePageRankVertexWriter extends TextVertexWriter {
242 @Override
243 public void writeVertex(
244 Vertex<LongWritable, DoubleWritable, FloatWritable> vertex)
245 throws IOException, InterruptedException {
246 getRecordWriter().write(
247 new Text(vertex.getId().toString()),
248 new Text(vertex.getValue().toString()));
249 }
250 }
251 }
252 }