This project has retired. For details please refer to its
Attic page.
VerifyMessage xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.examples;
20
21 import org.apache.giraph.aggregators.LongSumAggregator;
22 import org.apache.giraph.graph.BasicComputation;
23 import org.apache.giraph.edge.Edge;
24 import org.apache.giraph.edge.EdgeFactory;
25 import org.apache.giraph.master.DefaultMasterCompute;
26 import org.apache.giraph.graph.Vertex;
27 import org.apache.giraph.worker.WorkerContext;
28 import org.apache.hadoop.io.FloatWritable;
29 import org.apache.hadoop.io.IntWritable;
30 import org.apache.hadoop.io.LongWritable;
31 import org.apache.hadoop.io.Writable;
32 import org.apache.log4j.Logger;
33
34 import java.io.DataInput;
35 import java.io.DataOutput;
36 import java.io.IOException;
37
38
39
40
41
42
43 public class VerifyMessage {
44
45
46
47 public static class VerifiableMessage implements Writable {
48
49 private long superstep;
50
51 private long sourceVertexId;
52
53 private float value;
54
55
56
57
58 public VerifiableMessage() { }
59
60
61
62
63
64
65
66 public VerifiableMessage(
67 long superstep, long sourceVertexId, float value) {
68 this.superstep = superstep;
69 this.sourceVertexId = sourceVertexId;
70 this.value = value;
71 }
72
73 @Override
74 public void readFields(DataInput input) throws IOException {
75 superstep = input.readLong();
76 sourceVertexId = input.readLong();
77 value = input.readFloat();
78 }
79
80 @Override
81 public void write(DataOutput output) throws IOException {
82 output.writeLong(superstep);
83 output.writeLong(sourceVertexId);
84 output.writeFloat(value);
85 }
86
87 @Override
88 public String toString() {
89 return "(superstep=" + superstep + ",sourceVertexId=" +
90 sourceVertexId + ",value=" + value + ")";
91 }
92 }
93
94
95
96
97 public static class VerifyMessageComputation extends
98 BasicComputation<LongWritable, IntWritable, FloatWritable,
99 VerifiableMessage> {
100
101 public static final String SUPERSTEP_COUNT =
102 "verifyMessageVertex.superstepCount";
103
104 private static long FINAL_SUM;
105
106 private static int SUPERSTEPS = 6;
107
108 private static Logger LOG =
109 Logger.getLogger(VerifyMessageComputation.class);
110
111 public static long getFinalSum() {
112 return FINAL_SUM;
113 }
114
115
116
117
118 public static class VerifyMessageVertexWorkerContext extends
119 WorkerContext {
120 @Override
121 public void preApplication() throws InstantiationException,
122 IllegalAccessException {
123 SUPERSTEPS = getContext().getConfiguration().getInt(
124 SUPERSTEP_COUNT, SUPERSTEPS);
125 }
126
127 @Override
128 public void postApplication() {
129 LongWritable sumAggregatorValue =
130 getAggregatedValue(LongSumAggregator.class.getName());
131 FINAL_SUM = sumAggregatorValue.get();
132 }
133
134 @Override
135 public void preSuperstep() {
136 }
137
138 @Override
139 public void postSuperstep() { }
140 }
141
142 @Override
143 public void compute(
144 Vertex<LongWritable, IntWritable, FloatWritable> vertex,
145 Iterable<VerifiableMessage> messages) throws IOException {
146 String sumAggregatorName = LongSumAggregator.class.getName();
147 if (getSuperstep() > SUPERSTEPS) {
148 vertex.voteToHalt();
149 return;
150 }
151 if (LOG.isDebugEnabled()) {
152 LOG.debug("compute: " + getAggregatedValue(sumAggregatorName));
153 }
154 aggregate(sumAggregatorName, new LongWritable(vertex.getId().get()));
155 if (LOG.isDebugEnabled()) {
156 LOG.debug("compute: sum = " +
157 this.<LongWritable>getAggregatedValue(sumAggregatorName).get() +
158 " for vertex " + vertex.getId());
159 }
160 float msgValue = 0.0f;
161 for (VerifiableMessage message : messages) {
162 msgValue += message.value;
163 if (LOG.isDebugEnabled()) {
164 LOG.debug("compute: got msg = " + message +
165 " for vertex id " + vertex.getId() +
166 ", vertex value " + vertex.getValue() +
167 " on superstep " + getSuperstep());
168 }
169 if (message.superstep != getSuperstep() - 1) {
170 throw new IllegalStateException(
171 "compute: Impossible to not get a messsage from " +
172 "the previous superstep, current superstep = " +
173 getSuperstep());
174 }
175 if ((message.sourceVertexId != vertex.getId().get() - 1) &&
176 (vertex.getId().get() != 0)) {
177 throw new IllegalStateException(
178 "compute: Impossible that this message didn't come " +
179 "from the previous vertex and came from " +
180 message.sourceVertexId);
181 }
182 }
183 int vertexValue = vertex.getValue().get();
184 vertex.setValue(new IntWritable(vertexValue + (int) msgValue));
185 if (LOG.isDebugEnabled()) {
186 LOG.debug("compute: vertex " + vertex.getId() +
187 " has value " + vertex.getValue() +
188 " on superstep " + getSuperstep());
189 }
190 for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
191 FloatWritable newEdgeValue = new FloatWritable(
192 edge.getValue().get() + (float) vertexValue);
193 Edge<LongWritable, FloatWritable> newEdge =
194 EdgeFactory.create(edge.getTargetVertexId(), newEdgeValue);
195 if (LOG.isDebugEnabled()) {
196 LOG.debug("compute: vertex " + vertex.getId() +
197 " sending edgeValue " + edge.getValue() +
198 " vertexValue " + vertexValue +
199 " total " + newEdgeValue +
200 " to vertex " + edge.getTargetVertexId() +
201 " on superstep " + getSuperstep());
202 }
203 vertex.addEdge(newEdge);
204 sendMessage(edge.getTargetVertexId(),
205 new VerifiableMessage(
206 getSuperstep(), vertex.getId().get(), newEdgeValue.get()));
207 }
208 }
209 }
210
211
212
213
214
215 public static class VerifyMessageMasterCompute extends
216 DefaultMasterCompute {
217 @Override
218 public void initialize() throws InstantiationException,
219 IllegalAccessException {
220 registerAggregator(LongSumAggregator.class.getName(),
221 LongSumAggregator.class);
222 }
223 }
224 }