This project has retired. For details please refer to its
        
        Attic page.
      
 
ReducersBenchmark xref
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.giraph.benchmark;
20  
21  import java.io.DataInput;
22  import java.io.DataOutput;
23  import java.io.IOException;
24  import java.util.Set;
25  
26  import org.apache.commons.cli.CommandLine;
27  import org.apache.giraph.conf.GiraphConfiguration;
28  import org.apache.giraph.conf.GiraphConstants;
29  import org.apache.giraph.graph.BasicComputation;
30  import org.apache.giraph.graph.Vertex;
31  import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
32  import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
33  import org.apache.giraph.master.DefaultMasterCompute;
34  import org.apache.giraph.reducers.ReduceSameTypeOperation;
35  import org.apache.giraph.worker.DefaultWorkerContext;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.io.DoubleWritable;
38  import org.apache.hadoop.io.LongWritable;
39  import org.apache.hadoop.util.ToolRunner;
40  
41  import com.google.common.collect.Sets;
42  
43  
44  
45  
46  public class ReducersBenchmark extends GiraphBenchmark {
47    
48    private static final String REDUCERS_NUM = "reducersbenchmark.num";
49  
50    
51    private static final BenchmarkOption REDUCERS =
52        new BenchmarkOption("r", "reducers",
53            true, "Reducers", "Need to set number of reducers (-r)");
54  
55    
56    public static class TestLongSumReducer
57        extends ReduceSameTypeOperation<LongWritable> {
58      
59      public static final TestLongSumReducer INSTANCE = new TestLongSumReducer();
60  
61      @Override
62      public LongWritable createInitialValue() {
63        return new LongWritable();
64      }
65  
66      @Override
67      public LongWritable reduce(
68          LongWritable curValue, LongWritable valueToReduce) {
69        curValue.set(curValue.get() + valueToReduce.get());
70        return curValue;
71      }
72  
73      @Override
74      public void readFields(DataInput in) throws IOException {
75      }
76  
77      @Override
78      public void write(DataOutput out) throws IOException {
79      }
80    }
81  
82    
83  
84  
85    public static class ReducersBenchmarkComputation extends
86        BasicComputation<LongWritable, DoubleWritable, DoubleWritable,
87            DoubleWritable> {
88      @Override
89      public void compute(
90          Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex,
91          Iterable<DoubleWritable> messages) throws IOException {
92        int n = getNumReducers(getConf());
93        long superstep = getSuperstep();
94        int w = getWorkerContextReduced(getConf(), superstep);
95        for (int i = 0; i < n; i++) {
96          reduce("w" + i, new LongWritable((superstep + 1) * i));
97          reduce("p" + i, new LongWritable(i));
98  
99          if (superstep > 0) {
100           assertEquals(superstep * (getTotalNumVertices() * i) + w,
101               ((LongWritable) getBroadcast("w" + i)).get());
102           assertEquals(-(superstep * i),
103               ((LongWritable) getBroadcast("m" + i)).get());
104           assertEquals(superstep * getTotalNumVertices() * i,
105               ((LongWritable) getBroadcast("p" + i)).get());
106         }
107       }
108       if (superstep > 2) {
109         vertex.voteToHalt();
110       }
111     }
112   }
113 
114   
115 
116 
117   public static class ReducersBenchmarkMasterCompute extends
118       DefaultMasterCompute {
119     @Override
120     public void compute() {
121       int n = getNumReducers(getConf());
122       long superstep = getSuperstep();
123       int w = getWorkerContextReduced(getConf(), superstep);
124       for (int i = 0; i < n; i++) {
125         String wi = "w" + i;
126         String mi = "m" + i;
127         String pi = "p" + i;
128 
129         registerReducer(wi, TestLongSumReducer.INSTANCE);
130         registerReducer(mi, new TestLongSumReducer());
131 
132         if (superstep > 0) {
133           broadcast(wi, getReduced(wi));
134           broadcast(mi, new LongWritable(-superstep * i));
135           broadcast(pi, getReduced(pi));
136 
137           registerReducer(pi, new TestLongSumReducer(),
138               (LongWritable) getReduced(pi));
139 
140           assertEquals(superstep * (getTotalNumVertices() * i) + w,
141               ((LongWritable) getReduced(wi)).get());
142           assertEquals(superstep * getTotalNumVertices() * i,
143               ((LongWritable) getReduced(pi)).get());
144         } else {
145           registerReducer(pi, new TestLongSumReducer());
146         }
147       }
148     }
149   }
150 
151   
152 
153 
154   public static class ReducersBenchmarkWorkerContext
155       extends DefaultWorkerContext {
156     @Override
157     public void preSuperstep() {
158       addToWorkerReducers(1);
159       checkReducers();
160     }
161 
162     @Override
163     public void postSuperstep() {
164       addToWorkerReducers(2);
165       checkReducers();
166     }
167 
168     
169 
170 
171     private void checkReducers() {
172       int n = getNumReducers(getContext().getConfiguration());
173       long superstep = getSuperstep();
174       int w = getWorkerContextReduced(
175           getContext().getConfiguration(), superstep);
176       for (int i = 0; i < n; i++) {
177         if (superstep > 0) {
178           assertEquals(superstep * (getTotalNumVertices() * i) + w,
179               ((LongWritable) getBroadcast("w" + i)).get());
180           assertEquals(-(superstep * i),
181               ((LongWritable) getBroadcast("m" + i)).get());
182           assertEquals(superstep * getTotalNumVertices() * i,
183               ((LongWritable) getBroadcast("p" + i)).get());
184         }
185       }
186     }
187 
188     
189 
190 
191 
192 
193     private void addToWorkerReducers(int valueToAdd) {
194       int n = getNumReducers(getContext().getConfiguration());
195       for (int i = 0; i < n; i++) {
196         reduce("w" + i, new LongWritable(valueToAdd));
197       }
198     }
199   }
200 
201   
202 
203 
204 
205 
206 
207   private static int getNumReducers(Configuration conf) {
208     return conf.getInt(REDUCERS_NUM, 0);
209   }
210 
211   
212 
213 
214 
215 
216 
217 
218   private static int getWorkerContextReduced(Configuration conf,
219       long superstep) {
220     return (superstep <= 0) ? 0 : conf.getInt("workers", 0) * 3;
221   }
222 
223   
224 
225 
226 
227 
228 
229   private static void assertEquals(long expected, long actual) {
230     if (expected != actual) {
231       throw new RuntimeException("expected: " + expected +
232           ", actual: " + actual);
233     }
234   }
235 
236   @Override
237   public Set<BenchmarkOption> getBenchmarkOptions() {
238     return Sets.newHashSet(BenchmarkOption.VERTICES, REDUCERS);
239   }
240 
241   @Override
242   protected void prepareConfiguration(GiraphConfiguration conf,
243       CommandLine cmd) {
244     conf.setComputationClass(ReducersBenchmarkComputation.class);
245     conf.setMasterComputeClass(ReducersBenchmarkMasterCompute.class);
246     conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
247     conf.setWorkerContextClass(ReducersBenchmarkWorkerContext.class);
248     conf.setLong(PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
249         BenchmarkOption.VERTICES.getOptionLongValue(cmd));
250     conf.setLong(PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 1);
251     conf.setInt(REDUCERS_NUM, REDUCERS.getOptionIntValue(cmd));
252     conf.setInt("workers", conf.getInt(GiraphConstants.MAX_WORKERS, -1));
253   }
254 
255   
256 
257 
258 
259 
260 
261   public static void main(final String[] args) throws Exception {
262     System.exit(ToolRunner.run(new ReducersBenchmark(), args));
263   }
264 }