This project has retired. For details please refer to its Attic page.
VerifyMessage xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.examples;
20  
21  import org.apache.giraph.aggregators.LongSumAggregator;
22  import org.apache.giraph.graph.BasicComputation;
23  import org.apache.giraph.edge.Edge;
24  import org.apache.giraph.edge.EdgeFactory;
25  import org.apache.giraph.master.DefaultMasterCompute;
26  import org.apache.giraph.graph.Vertex;
27  import org.apache.giraph.worker.WorkerContext;
28  import org.apache.hadoop.io.FloatWritable;
29  import org.apache.hadoop.io.IntWritable;
30  import org.apache.hadoop.io.LongWritable;
31  import org.apache.hadoop.io.Writable;
32  import org.apache.log4j.Logger;
33  
34  import java.io.DataInput;
35  import java.io.DataOutput;
36  import java.io.IOException;
37  
38  /**
39   * An example that simply uses its id, value, and edges to compute new data
40   * every iteration to verify that messages are sent and received at the
41   * appropriate location and superstep.
42   */
43  public class VerifyMessage {
44    /**
45     * Message that will be sent in {@link VerifyMessageComputation}.
46     */
47    public static class VerifiableMessage implements Writable {
48      /** Superstep sent on */
49      private long superstep;
50      /** Source vertex id */
51      private long sourceVertexId;
52      /** Value */
53      private float value;
54  
55      /**
56       * Default constructor used with reflection.
57       */
58      public VerifiableMessage() { }
59  
60      /**
61       * Constructor with verifiable arguments.
62       * @param superstep Superstep this message was created on.
63       * @param sourceVertexId Who send this message.
64       * @param value A value associated with this message.
65       */
66      public VerifiableMessage(
67          long superstep, long sourceVertexId, float value) {
68        this.superstep = superstep;
69        this.sourceVertexId = sourceVertexId;
70        this.value = value;
71      }
72  
73      @Override
74      public void readFields(DataInput input) throws IOException {
75        superstep = input.readLong();
76        sourceVertexId = input.readLong();
77        value = input.readFloat();
78      }
79  
80      @Override
81      public void write(DataOutput output) throws IOException {
82        output.writeLong(superstep);
83        output.writeLong(sourceVertexId);
84        output.writeFloat(value);
85      }
86  
87      @Override
88      public String toString() {
89        return "(superstep=" + superstep + ",sourceVertexId=" +
90            sourceVertexId + ",value=" + value + ")";
91      }
92    }
93  
94    /**
95     * Send and verify messages.
96     */
97    public static class VerifyMessageComputation extends
98        BasicComputation<LongWritable, IntWritable, FloatWritable,
99            VerifiableMessage> {
100     /** Dynamically set number of SUPERSTEPS */
101     public static final String SUPERSTEP_COUNT =
102         "verifyMessageVertex.superstepCount";
103     /** User can access this after the application finishes if local */
104     private static long FINAL_SUM;
105     /** Number of SUPERSTEPS to run (6 by default) */
106     private static int SUPERSTEPS = 6;
107     /** Class logger */
108     private static Logger LOG =
109         Logger.getLogger(VerifyMessageComputation.class);
110 
111     public static long getFinalSum() {
112       return FINAL_SUM;
113     }
114 
115     /**
116      * Worker context used with {@link VerifyMessageComputation}.
117      */
118     public static class VerifyMessageVertexWorkerContext extends
119         WorkerContext {
120       @Override
121       public void preApplication() throws InstantiationException,
122       IllegalAccessException {
123         SUPERSTEPS = getContext().getConfiguration().getInt(
124             SUPERSTEP_COUNT, SUPERSTEPS);
125       }
126 
127       @Override
128       public void postApplication() {
129         LongWritable sumAggregatorValue =
130             getAggregatedValue(LongSumAggregator.class.getName());
131         FINAL_SUM = sumAggregatorValue.get();
132       }
133 
134       @Override
135       public void preSuperstep() {
136       }
137 
138       @Override
139       public void postSuperstep() { }
140     }
141 
142     @Override
143     public void compute(
144         Vertex<LongWritable, IntWritable, FloatWritable> vertex,
145         Iterable<VerifiableMessage> messages) throws IOException {
146       String sumAggregatorName = LongSumAggregator.class.getName();
147       if (getSuperstep() > SUPERSTEPS) {
148         vertex.voteToHalt();
149         return;
150       }
151       if (LOG.isDebugEnabled()) {
152         LOG.debug("compute: " + getAggregatedValue(sumAggregatorName));
153       }
154       aggregate(sumAggregatorName, new LongWritable(vertex.getId().get()));
155       if (LOG.isDebugEnabled()) {
156         LOG.debug("compute: sum = " +
157             this.<LongWritable>getAggregatedValue(sumAggregatorName).get() +
158             " for vertex " + vertex.getId());
159       }
160       float msgValue = 0.0f;
161       for (VerifiableMessage message : messages) {
162         msgValue += message.value;
163         if (LOG.isDebugEnabled()) {
164           LOG.debug("compute: got msg = " + message +
165               " for vertex id " + vertex.getId() +
166               ", vertex value " + vertex.getValue() +
167               " on superstep " + getSuperstep());
168         }
169         if (message.superstep != getSuperstep() - 1) {
170           throw new IllegalStateException(
171               "compute: Impossible to not get a messsage from " +
172                   "the previous superstep, current superstep = " +
173                   getSuperstep());
174         }
175         if ((message.sourceVertexId != vertex.getId().get() - 1) &&
176             (vertex.getId().get() != 0)) {
177           throw new IllegalStateException(
178               "compute: Impossible that this message didn't come " +
179                   "from the previous vertex and came from " +
180                   message.sourceVertexId);
181         }
182       }
183       int vertexValue = vertex.getValue().get();
184       vertex.setValue(new IntWritable(vertexValue + (int) msgValue));
185       if (LOG.isDebugEnabled()) {
186         LOG.debug("compute: vertex " + vertex.getId() +
187             " has value " + vertex.getValue() +
188             " on superstep " + getSuperstep());
189       }
190       for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
191         FloatWritable newEdgeValue = new FloatWritable(
192             edge.getValue().get() + (float) vertexValue);
193         Edge<LongWritable, FloatWritable> newEdge =
194             EdgeFactory.create(edge.getTargetVertexId(), newEdgeValue);
195         if (LOG.isDebugEnabled()) {
196           LOG.debug("compute: vertex " + vertex.getId() +
197               " sending edgeValue " + edge.getValue() +
198               " vertexValue " + vertexValue +
199               " total " + newEdgeValue +
200               " to vertex " + edge.getTargetVertexId() +
201               " on superstep " + getSuperstep());
202         }
203         vertex.addEdge(newEdge);
204         sendMessage(edge.getTargetVertexId(),
205             new VerifiableMessage(
206                 getSuperstep(), vertex.getId().get(), newEdgeValue.get()));
207       }
208     }
209   }
210 
211   /**
212    * Master compute associated with {@link VerifyMessageComputation}.
213    * It registers required aggregators.
214    */
215   public static class VerifyMessageMasterCompute extends
216       DefaultMasterCompute {
217     @Override
218     public void initialize() throws InstantiationException,
219         IllegalAccessException {
220       registerAggregator(LongSumAggregator.class.getName(),
221           LongSumAggregator.class);
222     }
223   }
224 }