This project has retired. For details please refer to its Attic page.
SimpleSuperstepComputation xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.examples;
20  
21  import org.apache.giraph.edge.Edge;
22  import org.apache.giraph.edge.EdgeFactory;
23  import org.apache.giraph.graph.BasicComputation;
24  import org.apache.giraph.graph.Vertex;
25  import org.apache.giraph.io.VertexReader;
26  import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
27  import org.apache.giraph.io.formats.TextVertexOutputFormat;
28  import org.apache.hadoop.io.FloatWritable;
29  import org.apache.hadoop.io.IntWritable;
30  import org.apache.hadoop.io.LongWritable;
31  import org.apache.hadoop.io.Text;
32  import org.apache.hadoop.mapreduce.InputSplit;
33  import org.apache.hadoop.mapreduce.TaskAttemptContext;
34  import org.apache.log4j.Logger;
35  
36  import com.google.common.collect.Lists;
37  
38  import java.io.IOException;
39  import java.util.List;
40  
41  /**
42   * Just a simple Vertex compute implementation that executes 3 supersteps, then
43   * finishes.
44   */
45  public class SimpleSuperstepComputation extends BasicComputation<LongWritable,
46      IntWritable, FloatWritable, IntWritable> {
47    @Override
48    public void compute(
49        Vertex<LongWritable, IntWritable, FloatWritable> vertex,
50        Iterable<IntWritable> messages) throws IOException {
51      // Some checks for additional testing
52      if (getTotalNumVertices() < 1) {
53        throw new IllegalStateException("compute: Illegal total vertices " +
54            getTotalNumVertices());
55      }
56      if (getTotalNumEdges() < 0) {
57        throw new IllegalStateException("compute: Illegal total edges " +
58            getTotalNumEdges());
59      }
60      if (vertex.isHalted()) {
61        throw new IllegalStateException("compute: Impossible to be halted - " +
62            vertex.isHalted());
63      }
64  
65      if (getSuperstep() > 3) {
66        vertex.voteToHalt();
67      }
68    }
69  
70    /**
71     * Simple VertexReader that supports {@link SimpleSuperstepComputation}
72     */
73    public static class SimpleSuperstepVertexReader extends
74        GeneratedVertexReader<LongWritable, IntWritable, FloatWritable> {
75      /** Class logger */
76      private static final Logger LOG =
77          Logger.getLogger(SimpleSuperstepVertexReader.class);
78  
79      @Override
80      public boolean nextVertex() throws IOException, InterruptedException {
81        return totalRecords > recordsRead;
82      }
83  
84      @Override
85      public Vertex<LongWritable, IntWritable, FloatWritable> getCurrentVertex()
86        throws IOException, InterruptedException {
87        Vertex<LongWritable, IntWritable, FloatWritable> vertex =
88            getConf().createVertex();
89        long tmpId = reverseIdOrder ?
90            ((inputSplit.getSplitIndex() + 1) * totalRecords) -
91            recordsRead - 1 :
92              (inputSplit.getSplitIndex() * totalRecords) + recordsRead;
93        LongWritable vertexId = new LongWritable(tmpId);
94        IntWritable vertexValue =
95            new IntWritable((int) (vertexId.get() * 10));
96        List<Edge<LongWritable, FloatWritable>> edges = Lists.newLinkedList();
97        long targetVertexId =
98            (vertexId.get() + 1) %
99            (inputSplit.getNumSplits() * totalRecords);
100       float edgeValue = vertexId.get() * 100f;
101       edges.add(EdgeFactory.create(new LongWritable(targetVertexId),
102           new FloatWritable(edgeValue)));
103       vertex.initialize(vertexId, vertexValue, edges);
104       ++recordsRead;
105       if (LOG.isInfoEnabled()) {
106         LOG.info("next: Return vertexId=" + vertex.getId().get() +
107             ", vertexValue=" + vertex.getValue() +
108             ", targetVertexId=" + targetVertexId +
109             ", edgeValue=" + edgeValue);
110       }
111       return vertex;
112     }
113   }
114 
115   /**
116    * Simple VertexInputFormat that supports {@link SimpleSuperstepComputation}
117    */
118   public static class SimpleSuperstepVertexInputFormat extends
119     GeneratedVertexInputFormat<LongWritable, IntWritable, FloatWritable> {
120     @Override
121     public VertexReader<LongWritable, IntWritable, FloatWritable>
122     createVertexReader(InputSplit split, TaskAttemptContext context)
123       throws IOException {
124       return new SimpleSuperstepVertexReader();
125     }
126   }
127 
128 
129   /**
130    * Simple VertexOutputFormat that supports {@link SimpleSuperstepComputation}
131    */
132   public static class SimpleSuperstepVertexOutputFormat extends
133       TextVertexOutputFormat<LongWritable, IntWritable, FloatWritable> {
134     @Override
135     public TextVertexWriter createVertexWriter(TaskAttemptContext context)
136       throws IOException, InterruptedException {
137       return new SimpleSuperstepVertexWriter();
138     }
139 
140     /**
141      * Simple VertexWriter that supports {@link SimpleSuperstepComputation}
142      */
143     public class SimpleSuperstepVertexWriter extends TextVertexWriter {
144       @Override
145       public void writeVertex(Vertex<LongWritable, IntWritable,
146           FloatWritable> vertex) throws IOException, InterruptedException {
147         getRecordWriter().write(
148             new Text(vertex.getId().toString()),
149             new Text(vertex.getValue().toString()));
150       }
151     }
152   }
153 }