This project has retired. For details please refer to its
Attic page.
TestMasterObserver xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.master;
20
21 import org.apache.giraph.graph.BasicComputation;
22 import org.apache.giraph.conf.GiraphConfiguration;
23 import org.apache.giraph.conf.GiraphConstants;
24 import org.apache.giraph.edge.ByteArrayEdges;
25 import org.apache.giraph.edge.Edge;
26 import org.apache.giraph.graph.Vertex;
27 import org.apache.giraph.io.formats.TextVertexInputFormat;
28 import org.apache.giraph.utils.InternalVertexRunner;
29 import org.apache.hadoop.io.IntWritable;
30 import org.apache.hadoop.io.NullWritable;
31 import org.apache.hadoop.io.Text;
32 import org.apache.hadoop.mapreduce.InputSplit;
33 import org.apache.hadoop.mapreduce.TaskAttemptContext;
34 import org.junit.Test;
35
36 import java.io.IOException;
37
38 import static org.apache.hadoop.util.StringUtils.arrayToString;
39 import static org.junit.Assert.assertEquals;
40
41 import com.google.common.collect.ImmutableList;
42
43 public class TestMasterObserver {
44 public static class SimpleComputation extends BasicComputation<IntWritable,
45 IntWritable, NullWritable, NullWritable> {
46 @Override
47 public void compute(
48 Vertex<IntWritable, IntWritable, NullWritable> vertex,
49 Iterable<NullWritable> messages) throws IOException {
50 int currentValue = vertex.getValue().get();
51 if (currentValue == 2) {
52 vertex.voteToHalt();
53 }
54 vertex.setValue(new IntWritable(currentValue + 1));
55 }
56 }
57
58 public static class InputFormat extends TextVertexInputFormat<
59 IntWritable, IntWritable, NullWritable> {
60 @Override
61 public TextVertexReader createVertexReader(
62 InputSplit split, TaskAttemptContext context) throws IOException {
63 return new TextVertexReaderFromEachLine() {
64 @Override
65 protected IntWritable getId(Text line) throws IOException {
66 return new IntWritable(Integer.parseInt(line.toString()));
67 }
68
69 @Override
70 protected IntWritable getValue(Text line) throws IOException {
71 return new IntWritable(0);
72 }
73
74 @Override
75 protected Iterable<Edge<IntWritable, NullWritable>> getEdges(
76 Text line) throws IOException {
77 return ImmutableList.of();
78 }
79 };
80 }
81 }
82
83 public static class Obs extends DefaultMasterObserver {
84 public static int preApp = 0;
85 public static int preSuperstep = 0;
86 public static int postSuperstep = 0;
87 public static int postApp = 0;
88
89 @Override
90 public void preApplication() {
91 ++preApp;
92 }
93
94 @Override
95 public void postApplication() {
96 ++postApp;
97 }
98
99 @Override
100 public void preSuperstep(long superstep) {
101 ++preSuperstep;
102 }
103
104 @Override
105 public void postSuperstep(long superstep) {
106 ++postSuperstep;
107 }
108 }
109
110 @Test
111 public void testGetsCalled() throws Exception {
112 assertEquals(0, Obs.postApp);
113
114 String[] graph = new String[] { "1", "2", "3" };
115
116 String klasses[] = new String[] {
117 Obs.class.getName(),
118 Obs.class.getName()
119 };
120
121 GiraphConfiguration conf = new GiraphConfiguration();
122 conf.set(GiraphConstants.MASTER_OBSERVER_CLASSES.getKey(),
123 arrayToString(klasses));
124 conf.setComputationClass(SimpleComputation.class);
125 conf.setOutEdgesClass(ByteArrayEdges.class);
126 conf.setVertexInputFormatClass(InputFormat.class);
127 InternalVertexRunner.run(conf, graph);
128
129 assertEquals(2, Obs.preApp);
130
131 assertEquals(8, Obs.preSuperstep);
132 assertEquals(8, Obs.postSuperstep);
133 assertEquals(2, Obs.postApp);
134 }
135 }