This project has retired. For details please refer to its
Attic page.
SimpleSuperstepComputation xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.examples;
20
21 import org.apache.giraph.edge.Edge;
22 import org.apache.giraph.edge.EdgeFactory;
23 import org.apache.giraph.graph.BasicComputation;
24 import org.apache.giraph.graph.Vertex;
25 import org.apache.giraph.io.VertexReader;
26 import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
27 import org.apache.giraph.io.formats.TextVertexOutputFormat;
28 import org.apache.hadoop.io.FloatWritable;
29 import org.apache.hadoop.io.IntWritable;
30 import org.apache.hadoop.io.LongWritable;
31 import org.apache.hadoop.io.Text;
32 import org.apache.hadoop.mapreduce.InputSplit;
33 import org.apache.hadoop.mapreduce.TaskAttemptContext;
34 import org.apache.log4j.Logger;
35
36 import com.google.common.collect.Lists;
37
38 import java.io.IOException;
39 import java.util.List;
40
41
42
43
44
45 public class SimpleSuperstepComputation extends BasicComputation<LongWritable,
46 IntWritable, FloatWritable, IntWritable> {
47 @Override
48 public void compute(
49 Vertex<LongWritable, IntWritable, FloatWritable> vertex,
50 Iterable<IntWritable> messages) throws IOException {
51
52 if (getTotalNumVertices() < 1) {
53 throw new IllegalStateException("compute: Illegal total vertices " +
54 getTotalNumVertices());
55 }
56 if (getTotalNumEdges() < 0) {
57 throw new IllegalStateException("compute: Illegal total edges " +
58 getTotalNumEdges());
59 }
60 if (vertex.isHalted()) {
61 throw new IllegalStateException("compute: Impossible to be halted - " +
62 vertex.isHalted());
63 }
64
65 if (getSuperstep() > 3) {
66 vertex.voteToHalt();
67 }
68 }
69
70
71
72
73 public static class SimpleSuperstepVertexReader extends
74 GeneratedVertexReader<LongWritable, IntWritable, FloatWritable> {
75
76 private static final Logger LOG =
77 Logger.getLogger(SimpleSuperstepVertexReader.class);
78
79 @Override
80 public boolean nextVertex() throws IOException, InterruptedException {
81 return totalRecords > recordsRead;
82 }
83
84 @Override
85 public Vertex<LongWritable, IntWritable, FloatWritable> getCurrentVertex()
86 throws IOException, InterruptedException {
87 Vertex<LongWritable, IntWritable, FloatWritable> vertex =
88 getConf().createVertex();
89 long tmpId = reverseIdOrder ?
90 ((inputSplit.getSplitIndex() + 1) * totalRecords) -
91 recordsRead - 1 :
92 (inputSplit.getSplitIndex() * totalRecords) + recordsRead;
93 LongWritable vertexId = new LongWritable(tmpId);
94 IntWritable vertexValue =
95 new IntWritable((int) (vertexId.get() * 10));
96 List<Edge<LongWritable, FloatWritable>> edges = Lists.newLinkedList();
97 long targetVertexId =
98 (vertexId.get() + 1) %
99 (inputSplit.getNumSplits() * totalRecords);
100 float edgeValue = vertexId.get() * 100f;
101 edges.add(EdgeFactory.create(new LongWritable(targetVertexId),
102 new FloatWritable(edgeValue)));
103 vertex.initialize(vertexId, vertexValue, edges);
104 ++recordsRead;
105 if (LOG.isInfoEnabled()) {
106 LOG.info("next: Return vertexId=" + vertex.getId().get() +
107 ", vertexValue=" + vertex.getValue() +
108 ", targetVertexId=" + targetVertexId +
109 ", edgeValue=" + edgeValue);
110 }
111 return vertex;
112 }
113 }
114
115
116
117
118 public static class SimpleSuperstepVertexInputFormat extends
119 GeneratedVertexInputFormat<LongWritable, IntWritable, FloatWritable> {
120 @Override
121 public VertexReader<LongWritable, IntWritable, FloatWritable>
122 createVertexReader(InputSplit split, TaskAttemptContext context)
123 throws IOException {
124 return new SimpleSuperstepVertexReader();
125 }
126 }
127
128
129
130
131
132 public static class SimpleSuperstepVertexOutputFormat extends
133 TextVertexOutputFormat<LongWritable, IntWritable, FloatWritable> {
134 @Override
135 public TextVertexWriter createVertexWriter(TaskAttemptContext context)
136 throws IOException, InterruptedException {
137 return new SimpleSuperstepVertexWriter();
138 }
139
140
141
142
143 public class SimpleSuperstepVertexWriter extends TextVertexWriter {
144 @Override
145 public void writeVertex(Vertex<LongWritable, IntWritable,
146 FloatWritable> vertex) throws IOException, InterruptedException {
147 getRecordWriter().write(
148 new Text(vertex.getId().toString()),
149 new Text(vertex.getValue().toString()));
150 }
151 }
152 }
153 }