This project has retired. For details please refer to its Attic page.
TestComputationStateComputation xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.examples;
20  
21  import org.apache.giraph.graph.BasicComputation;
22  import org.apache.giraph.graph.Vertex;
23  import org.apache.giraph.worker.DefaultWorkerContext;
24  import org.apache.hadoop.io.DoubleWritable;
25  import org.apache.hadoop.io.FloatWritable;
26  import org.apache.hadoop.io.LongWritable;
27  
28  import java.io.IOException;
29  import java.util.concurrent.atomic.AtomicLong;
30  
31  /**
32   * Vertex to test the local variables in Computation, and pre/postSuperstep
33   * methods
34   */
35  public class TestComputationStateComputation extends BasicComputation<
36      LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
37    /** How many compute threads to use in the test */
38    public static final int NUM_COMPUTE_THREADS = 10;
39    /** How many vertices to create for the test */
40    public static final int NUM_VERTICES = 100;
41    /** How many partitions to have */
42    public static final int NUM_PARTITIONS = 25;
43  
44    /**
45     * The counter should hold the number of vertices in this partition,
46     * plus the current superstep
47     */
48    private long counter;
49  
50    @Override
51    public void compute(
52        Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
53        Iterable<DoubleWritable> messages) throws IOException {
54      counter++;
55      if (getSuperstep() > 5) {
56        vertex.voteToHalt();
57      }
58    }
59  
60    @Override
61    public void preSuperstep() {
62      counter =
63        ((TestComputationStateWorkerContext) getWorkerContext()).superstepCounter;
64    }
65  
66    @Override
67    public void postSuperstep() {
68      ((TestComputationStateWorkerContext) getWorkerContext()).totalCounter
69          .addAndGet(counter);
70    }
71  
72    /**
73     * WorkerContext for TestComputationState
74     */
75    public static class TestComputationStateWorkerContext extends
76        DefaultWorkerContext {
77      /** Current superstep */
78      private long superstepCounter;
79      /**
80       * This counter should hold the sum of Computation's counters
81       */
82      private AtomicLong totalCounter;
83  
84      @Override
85      public void preSuperstep() {
86        superstepCounter = getSuperstep();
87        totalCounter = new AtomicLong(0);
88      }
89  
90      @Override
91      public void postSuperstep() {
92        assertEquals(totalCounter.get(),
93            NUM_COMPUTE_THREADS * superstepCounter + getTotalNumVertices());
94      }
95    }
96  
97    /**
98     * Throws exception if values are not equal.
99     *
100    * @param expected Expected value
101    * @param actual   Actual value
102    */
103   private static void assertEquals(long expected, long actual) {
104     if (expected != actual) {
105       throw new RuntimeException("expected: " + expected +
106           ", actual: " + actual);
107     }
108   }
109 }