This project has retired. For details please refer to its Attic page.
SimpleFailComputation xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.examples;
20  
21  import org.apache.giraph.graph.BasicComputation;
22  import org.apache.giraph.graph.Vertex;
23  import org.apache.hadoop.io.DoubleWritable;
24  import org.apache.hadoop.io.FloatWritable;
25  import org.apache.hadoop.io.LongWritable;
26  import org.apache.log4j.Logger;
27  
28  import java.io.IOException;
29  
30  /**
31   * Vertex to allow unit testing of failure detection
32   */
33  public class SimpleFailComputation extends BasicComputation<
34      LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
35    /** Class logger */
36    private static Logger LOG = Logger.getLogger(SimpleFailComputation.class);
37    /** TODO: Change this behavior to WorkerContext */
38    private static long SUPERSTEP = 0;
39  
40    @Override
41    public void compute(
42        Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
43        Iterable<DoubleWritable> messages) throws IOException {
44      if (getSuperstep() >= 1) {
45        double sum = 0;
46        for (DoubleWritable message : messages) {
47          sum += message.get();
48        }
49        DoubleWritable vertexValue =
50            new DoubleWritable((0.15f / getTotalNumVertices()) + 0.85f * sum);
51        vertex.setValue(vertexValue);
52        if (getSuperstep() < 30) {
53          if (getSuperstep() == 20) {
54            if (vertex.getId().get() == 10L) {
55              try {
56                Thread.sleep(2000);
57              } catch (InterruptedException e) {
58                LOG.info("Sleep interrupted ", e);
59              }
60              System.exit(1);
61            } else if (getSuperstep() - SUPERSTEP > 10) {
62              return;
63            }
64          }
65          long edges = vertex.getNumEdges();
66          sendMessageToAllEdges(vertex,
67              new DoubleWritable(vertex.getValue().get() / edges));
68        } else {
69          vertex.voteToHalt();
70        }
71        setSuperstep(getSuperstep());
72      }
73    }
74  
75    /**
76     * Set the superstep
77     *
78     * @param superstep to set
79     */
80    private static void setSuperstep(long superstep) {
81      SUPERSTEP = superstep;
82    }
83  }