This project has retired. For details please refer to its
Attic page.
SimpleMutateGraphComputation xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.examples;
20
21 import org.apache.giraph.graph.BasicComputation;
22 import org.apache.giraph.edge.EdgeFactory;
23 import org.apache.giraph.graph.Vertex;
24 import org.apache.giraph.worker.WorkerContext;
25 import org.apache.hadoop.io.DoubleWritable;
26 import org.apache.hadoop.io.FloatWritable;
27 import org.apache.hadoop.io.LongWritable;
28 import org.apache.log4j.Logger;
29
30 import java.io.IOException;
31
32
33
34
35 public class SimpleMutateGraphComputation extends BasicComputation<
36 LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
37
38 private static Logger LOG =
39 Logger.getLogger(SimpleMutateGraphComputation.class);
40
41 private long maxRanges = 100;
42
43
44
45
46
47
48
49
50
51 private long rangeVertexIdStart(int range) {
52 return (Long.MAX_VALUE / maxRanges) * range;
53 }
54
55 @Override
56 public void compute(
57 Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
58 Iterable<DoubleWritable> messages) throws IOException {
59 SimpleMutateGraphVertexWorkerContext workerContext = getWorkerContext();
60 if (getSuperstep() == 0) {
61 LOG.debug("Reached superstep " + getSuperstep());
62 } else if (getSuperstep() == 1) {
63
64
65 LongWritable destVertexId =
66 new LongWritable(rangeVertexIdStart(1) + vertex.getId().get());
67 sendMessage(destVertexId, new DoubleWritable(0.0));
68 } else if (getSuperstep() == 2) {
69 LOG.debug("Reached superstep " + getSuperstep());
70 } else if (getSuperstep() == 3) {
71 long vertexCount = workerContext.getVertexCount();
72 if (vertexCount * 2 != getTotalNumVertices()) {
73 throw new IllegalStateException(
74 "Impossible to have " + getTotalNumVertices() +
75 " vertices when should have " + vertexCount * 2 +
76 " on superstep " + getSuperstep());
77 }
78 long edgeCount = workerContext.getEdgeCount();
79 if (edgeCount != getTotalNumEdges()) {
80 throw new IllegalStateException(
81 "Impossible to have " + getTotalNumEdges() +
82 " edges when should have " + edgeCount +
83 " on superstep " + getSuperstep());
84 }
85
86 LongWritable vertexIndex =
87 new LongWritable(rangeVertexIdStart(3) + vertex.getId().get());
88 addVertexRequest(vertexIndex, new DoubleWritable(0.0));
89
90 addEdgeRequest(vertexIndex,
91 EdgeFactory.create(vertex.getId(), new FloatWritable(0.0f)));
92 } else if (getSuperstep() == 4) {
93 LOG.debug("Reached superstep " + getSuperstep());
94 } else if (getSuperstep() == 5) {
95 long vertexCount = workerContext.getVertexCount();
96 if (vertexCount * 2 != getTotalNumVertices()) {
97 throw new IllegalStateException(
98 "Impossible to have " + getTotalNumVertices() +
99 " when should have " + vertexCount * 2 +
100 " on superstep " + getSuperstep());
101 }
102 long edgeCount = workerContext.getEdgeCount();
103 if (edgeCount + vertexCount != getTotalNumEdges()) {
104 throw new IllegalStateException(
105 "Impossible to have " + getTotalNumEdges() +
106 " edges when should have " + edgeCount + vertexCount +
107 " on superstep " + getSuperstep());
108 }
109
110 LongWritable vertexIndex =
111 new LongWritable(rangeVertexIdStart(3) + vertex.getId().get());
112 workerContext.increaseEdgesRemoved();
113 removeEdgesRequest(vertexIndex, vertex.getId());
114 } else if (getSuperstep() == 6) {
115
116 if (vertex.getId().compareTo(
117 new LongWritable(rangeVertexIdStart(3))) >= 0) {
118 removeVertexRequest(vertex.getId());
119 }
120 } else if (getSuperstep() == 7) {
121 long origEdgeCount = workerContext.getOrigEdgeCount();
122 if (origEdgeCount != getTotalNumEdges()) {
123 throw new IllegalStateException(
124 "Impossible to have " + getTotalNumEdges() +
125 " edges when should have " + origEdgeCount +
126 " on superstep " + getSuperstep());
127 }
128 } else if (getSuperstep() == 8) {
129 long vertexCount = workerContext.getVertexCount();
130 if (vertexCount / 2 != getTotalNumVertices()) {
131 throw new IllegalStateException(
132 "Impossible to have " + getTotalNumVertices() +
133 " vertices when should have " + vertexCount / 2 +
134 " on superstep " + getSuperstep());
135 }
136 } else if (getSuperstep() == 9) {
137
138
139 if (vertex.getId().compareTo(
140 new LongWritable(rangeVertexIdStart(1))) >= 0) {
141
142 removeVertexRequest(vertex.getId());
143 } else {
144
145
146 sendMessage(
147 new LongWritable(rangeVertexIdStart(1) + vertex.getId().get()),
148 new DoubleWritable(0.0));
149 }
150 } else if (getSuperstep() == 10) {
151 LOG.debug("Reached superstep " + getSuperstep());
152 } else if (getSuperstep() == 11) {
153 long vertexCount = workerContext.getVertexCount();
154 if (vertexCount / 2 != getTotalNumVertices()) {
155 throw new IllegalStateException(
156 "Impossible to have " + getTotalNumVertices() +
157 " vertices when should have " + vertexCount / 2 +
158 " on superstep " + getSuperstep());
159 }
160 } else {
161 vertex.voteToHalt();
162 }
163 }
164
165
166
167
168 public static class SimpleMutateGraphVertexWorkerContext
169 extends WorkerContext {
170
171 private long vertexCount;
172
173 private long edgeCount;
174
175 private long origEdgeCount;
176
177 private int edgesRemoved = 0;
178
179 @Override
180 public void preApplication()
181 throws InstantiationException, IllegalAccessException { }
182
183 @Override
184 public void postApplication() { }
185
186 @Override
187 public void preSuperstep() { }
188
189 @Override
190 public void postSuperstep() {
191 vertexCount = getTotalNumVertices();
192 edgeCount = getTotalNumEdges();
193 if (getSuperstep() == 1) {
194 origEdgeCount = edgeCount;
195 }
196 LOG.info("Got " + vertexCount + " vertices, " +
197 edgeCount + " edges on superstep " +
198 getSuperstep());
199 LOG.info("Removed " + edgesRemoved);
200 edgesRemoved = 0;
201 }
202
203 public long getVertexCount() {
204 return vertexCount;
205 }
206
207 public long getEdgeCount() {
208 return edgeCount;
209 }
210
211 public long getOrigEdgeCount() {
212 return origEdgeCount;
213 }
214
215
216
217
218 public void increaseEdgesRemoved() {
219 this.edgesRemoved++;
220 }
221 }
222 }