1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.examples;
2021import org.apache.giraph.graph.BasicComputation;
22import org.apache.giraph.graph.Vertex;
23import org.apache.giraph.worker.DefaultWorkerContext;
24import org.apache.hadoop.io.DoubleWritable;
25import org.apache.hadoop.io.FloatWritable;
26import org.apache.hadoop.io.LongWritable;
2728import java.io.IOException;
29import java.util.concurrent.atomic.AtomicLong;
3031/**32 * Vertex to test the local variables in Computation, and pre/postSuperstep33 * methods34 */35publicclassTestComputationStateComputationextends BasicComputation<
36 LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
37/** How many compute threads to use in the test */38publicstaticfinalint NUM_COMPUTE_THREADS = 10;
39/** How many vertices to create for the test */40publicstaticfinalint NUM_VERTICES = 100;
41/** How many partitions to have */42publicstaticfinalint NUM_PARTITIONS = 25;
4344/**45 * The counter should hold the number of vertices in this partition,46 * plus the current superstep47 */48privatelong counter;
4950 @Override
51publicvoid compute(
52 Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
53 Iterable<DoubleWritable> messages) throws IOException {
54 counter++;
55if (getSuperstep() > 5) {
56 vertex.voteToHalt();
57 }
58 }
5960 @Override
61publicvoid preSuperstep() {
62 counter =
63 ((TestComputationStateWorkerContext) getWorkerContext()).superstepCounter;
64 }
6566 @Override
67publicvoid postSuperstep() {
68 ((TestComputationStateWorkerContext) getWorkerContext()).totalCounter
69 .addAndGet(counter);
70 }
7172/**73 * WorkerContext for TestComputationState74 */75publicstaticclassTestComputationStateWorkerContextextends76DefaultWorkerContext {
77/** Current superstep */78privatelong superstepCounter;
79/**80 * This counter should hold the sum of Computation's counters81 */82private AtomicLong totalCounter;
8384 @Override
85publicvoid preSuperstep() {
86 superstepCounter = getSuperstep();
87 totalCounter = new AtomicLong(0);
88 }
8990 @Override
91publicvoid postSuperstep() {
92 assertEquals(totalCounter.get(),
93 NUM_COMPUTE_THREADS * superstepCounter + getTotalNumVertices());
94 }
95 }
9697/**98 * Throws exception if values are not equal.99 *100 * @param expected Expected value101 * @param actual Actual value102 */103privatestaticvoid assertEquals(long expected, long actual) {
104if (expected != actual) {
105thrownew RuntimeException("expected: " + expected +
106", actual: " + actual);
107 }
108 }
109 }