This project has retired. For details please refer to its Attic page.
TestMasterObserver xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.master;
20  
21  import org.apache.giraph.graph.BasicComputation;
22  import org.apache.giraph.conf.GiraphConfiguration;
23  import org.apache.giraph.conf.GiraphConstants;
24  import org.apache.giraph.edge.ByteArrayEdges;
25  import org.apache.giraph.edge.Edge;
26  import org.apache.giraph.graph.Vertex;
27  import org.apache.giraph.io.formats.TextVertexInputFormat;
28  import org.apache.giraph.utils.InternalVertexRunner;
29  import org.apache.hadoop.io.IntWritable;
30  import org.apache.hadoop.io.NullWritable;
31  import org.apache.hadoop.io.Text;
32  import org.apache.hadoop.mapreduce.InputSplit;
33  import org.apache.hadoop.mapreduce.TaskAttemptContext;
34  import org.junit.Test;
35  
36  import java.io.IOException;
37  
38  import static org.apache.hadoop.util.StringUtils.arrayToString;
39  import static org.junit.Assert.assertEquals;
40  
41  import com.google.common.collect.ImmutableList;
42  
43  public class TestMasterObserver {
44    public static class SimpleComputation extends BasicComputation<IntWritable,
45        IntWritable, NullWritable, NullWritable> {
46      @Override
47      public void compute(
48          Vertex<IntWritable, IntWritable, NullWritable> vertex,
49          Iterable<NullWritable> messages) throws IOException {
50        int currentValue = vertex.getValue().get();
51        if (currentValue == 2) {
52          vertex.voteToHalt();
53        }
54        vertex.setValue(new IntWritable(currentValue + 1));
55      }
56    }
57  
58    public static class InputFormat extends TextVertexInputFormat<
59        IntWritable, IntWritable, NullWritable> {
60      @Override
61      public TextVertexReader createVertexReader(
62          InputSplit split, TaskAttemptContext context) throws IOException {
63        return new TextVertexReaderFromEachLine() {
64          @Override
65          protected IntWritable getId(Text line) throws IOException {
66            return new IntWritable(Integer.parseInt(line.toString()));
67          }
68  
69          @Override
70          protected IntWritable getValue(Text line) throws IOException {
71            return new IntWritable(0);
72          }
73  
74          @Override
75          protected Iterable<Edge<IntWritable, NullWritable>> getEdges(
76              Text line) throws IOException {
77            return ImmutableList.of();
78          }
79        };
80      }
81    }
82  
83    public static class Obs extends DefaultMasterObserver {
84      public static int preApp = 0;
85      public static int preSuperstep = 0;
86      public static int postSuperstep = 0;
87      public static int postApp = 0;
88  
89      @Override
90      public void preApplication() {
91        ++preApp;
92      }
93  
94      @Override
95      public void postApplication() {
96        ++postApp;
97      }
98  
99      @Override
100     public void preSuperstep(long superstep) {
101       ++preSuperstep;
102     }
103 
104     @Override
105     public void postSuperstep(long superstep) {
106       ++postSuperstep;
107     }
108   }
109 
110   @Test
111   public void testGetsCalled() throws Exception {
112     assertEquals(0, Obs.postApp);
113 
114     String[] graph = new String[] { "1", "2", "3" };
115 
116     String klasses[] = new String[] {
117         Obs.class.getName(),
118         Obs.class.getName()
119     };
120 
121     GiraphConfiguration conf = new GiraphConfiguration();
122     conf.set(GiraphConstants.MASTER_OBSERVER_CLASSES.getKey(),
123         arrayToString(klasses));
124     conf.setComputationClass(SimpleComputation.class);
125     conf.setOutEdgesClass(ByteArrayEdges.class);
126     conf.setVertexInputFormatClass(InputFormat.class);
127     InternalVertexRunner.run(conf, graph);
128 
129     assertEquals(2, Obs.preApp);
130     // 3 supersteps + 1 input superstep * 2 observers = 8 callbacks
131     assertEquals(8, Obs.preSuperstep);
132     assertEquals(8, Obs.postSuperstep);
133     assertEquals(2, Obs.postApp);
134   }
135 }