This project has retired. For details please refer to its Attic page.
ReducersBenchmark xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.benchmark;
20  
21  import java.io.DataInput;
22  import java.io.DataOutput;
23  import java.io.IOException;
24  import java.util.Set;
25  
26  import org.apache.commons.cli.CommandLine;
27  import org.apache.giraph.conf.GiraphConfiguration;
28  import org.apache.giraph.conf.GiraphConstants;
29  import org.apache.giraph.graph.BasicComputation;
30  import org.apache.giraph.graph.Vertex;
31  import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
32  import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
33  import org.apache.giraph.master.DefaultMasterCompute;
34  import org.apache.giraph.reducers.ReduceSameTypeOperation;
35  import org.apache.giraph.worker.DefaultWorkerContext;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.io.DoubleWritable;
38  import org.apache.hadoop.io.LongWritable;
39  import org.apache.hadoop.util.ToolRunner;
40  
41  import com.google.common.collect.Sets;
42  
43  /**
44   * Benchmark for reducers. Also checks the correctness.
45   */
46  public class ReducersBenchmark extends GiraphBenchmark {
47    /** Number of reducers setting */
48    private static final String REDUCERS_NUM = "reducersbenchmark.num";
49  
50    /** Option for number of reducers */
51    private static final BenchmarkOption REDUCERS =
52        new BenchmarkOption("r", "reducers",
53            true, "Reducers", "Need to set number of reducers (-r)");
54  
55    /** LongSumReducer */
56    public static class TestLongSumReducer
57        extends ReduceSameTypeOperation<LongWritable> {
58      /** Singleton */
59      public static final TestLongSumReducer INSTANCE = new TestLongSumReducer();
60  
61      @Override
62      public LongWritable createInitialValue() {
63        return new LongWritable();
64      }
65  
66      @Override
67      public LongWritable reduce(
68          LongWritable curValue, LongWritable valueToReduce) {
69        curValue.set(curValue.get() + valueToReduce.get());
70        return curValue;
71      }
72  
73      @Override
74      public void readFields(DataInput in) throws IOException {
75      }
76  
77      @Override
78      public void write(DataOutput out) throws IOException {
79      }
80    }
81  
82    /**
83     * Vertex class for ReducersBenchmark
84     */
85    public static class ReducersBenchmarkComputation extends
86        BasicComputation<LongWritable, DoubleWritable, DoubleWritable,
87            DoubleWritable> {
88      @Override
89      public void compute(
90          Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex,
91          Iterable<DoubleWritable> messages) throws IOException {
92        int n = getNumReducers(getConf());
93        long superstep = getSuperstep();
94        int w = getWorkerContextReduced(getConf(), superstep);
95        for (int i = 0; i < n; i++) {
96          reduce("w" + i, new LongWritable((superstep + 1) * i));
97          reduce("p" + i, new LongWritable(i));
98  
99          if (superstep > 0) {
100           assertEquals(superstep * (getTotalNumVertices() * i) + w,
101               ((LongWritable) getBroadcast("w" + i)).get());
102           assertEquals(-(superstep * i),
103               ((LongWritable) getBroadcast("m" + i)).get());
104           assertEquals(superstep * getTotalNumVertices() * i,
105               ((LongWritable) getBroadcast("p" + i)).get());
106         }
107       }
108       if (superstep > 2) {
109         vertex.voteToHalt();
110       }
111     }
112   }
113 
114   /**
115    * MasterCompute class for ReducersBenchmark
116    */
117   public static class ReducersBenchmarkMasterCompute extends
118       DefaultMasterCompute {
119     @Override
120     public void compute() {
121       int n = getNumReducers(getConf());
122       long superstep = getSuperstep();
123       int w = getWorkerContextReduced(getConf(), superstep);
124       for (int i = 0; i < n; i++) {
125         String wi = "w" + i;
126         String mi = "m" + i;
127         String pi = "p" + i;
128 
129         registerReducer(wi, TestLongSumReducer.INSTANCE);
130         registerReducer(mi, new TestLongSumReducer());
131 
132         if (superstep > 0) {
133           broadcast(wi, getReduced(wi));
134           broadcast(mi, new LongWritable(-superstep * i));
135           broadcast(pi, getReduced(pi));
136 
137           registerReducer(pi, new TestLongSumReducer(),
138               (LongWritable) getReduced(pi));
139 
140           assertEquals(superstep * (getTotalNumVertices() * i) + w,
141               ((LongWritable) getReduced(wi)).get());
142           assertEquals(superstep * getTotalNumVertices() * i,
143               ((LongWritable) getReduced(pi)).get());
144         } else {
145           registerReducer(pi, new TestLongSumReducer());
146         }
147       }
148     }
149   }
150 
151   /**
152    * WorkerContext class for ReducersBenchmark
153    */
154   public static class ReducersBenchmarkWorkerContext
155       extends DefaultWorkerContext {
156     @Override
157     public void preSuperstep() {
158       addToWorkerReducers(1);
159       checkReducers();
160     }
161 
162     @Override
163     public void postSuperstep() {
164       addToWorkerReducers(2);
165       checkReducers();
166     }
167 
168     /**
169      * Check if reducer values are correct for current superstep
170      */
171     private void checkReducers() {
172       int n = getNumReducers(getContext().getConfiguration());
173       long superstep = getSuperstep();
174       int w = getWorkerContextReduced(
175           getContext().getConfiguration(), superstep);
176       for (int i = 0; i < n; i++) {
177         if (superstep > 0) {
178           assertEquals(superstep * (getTotalNumVertices() * i) + w,
179               ((LongWritable) getBroadcast("w" + i)).get());
180           assertEquals(-(superstep * i),
181               ((LongWritable) getBroadcast("m" + i)).get());
182           assertEquals(superstep * getTotalNumVertices() * i,
183               ((LongWritable) getBroadcast("p" + i)).get());
184         }
185       }
186     }
187 
188     /**
189      * Add some value to worker reducers.
190      *
191      * @param valueToAdd Which value to add
192      */
193     private void addToWorkerReducers(int valueToAdd) {
194       int n = getNumReducers(getContext().getConfiguration());
195       for (int i = 0; i < n; i++) {
196         reduce("w" + i, new LongWritable(valueToAdd));
197       }
198     }
199   }
200 
201   /**
202    * Get the number of reducers from configuration
203    *
204    * @param conf Configuration
205    * @return Number of reducers
206    */
207   private static int getNumReducers(Configuration conf) {
208     return conf.getInt(REDUCERS_NUM, 0);
209   }
210 
211   /**
212    * Get the value which should be reduced by worker context
213    *
214    * @param conf Configuration
215    * @param superstep Superstep
216    * @return The value which should be reduced by worker context
217    */
218   private static int getWorkerContextReduced(Configuration conf,
219       long superstep) {
220     return (superstep <= 0) ? 0 : conf.getInt("workers", 0) * 3;
221   }
222 
223   /**
224    * Check if values are equal, throw an exception if they aren't
225    *
226    * @param expected Expected value
227    * @param actual Actual value
228    */
229   private static void assertEquals(long expected, long actual) {
230     if (expected != actual) {
231       throw new RuntimeException("expected: " + expected +
232           ", actual: " + actual);
233     }
234   }
235 
236   @Override
237   public Set<BenchmarkOption> getBenchmarkOptions() {
238     return Sets.newHashSet(BenchmarkOption.VERTICES, REDUCERS);
239   }
240 
241   @Override
242   protected void prepareConfiguration(GiraphConfiguration conf,
243       CommandLine cmd) {
244     conf.setComputationClass(ReducersBenchmarkComputation.class);
245     conf.setMasterComputeClass(ReducersBenchmarkMasterCompute.class);
246     conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
247     conf.setWorkerContextClass(ReducersBenchmarkWorkerContext.class);
248     conf.setLong(PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
249         BenchmarkOption.VERTICES.getOptionLongValue(cmd));
250     conf.setLong(PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 1);
251     conf.setInt(REDUCERS_NUM, REDUCERS.getOptionIntValue(cmd));
252     conf.setInt("workers", conf.getInt(GiraphConstants.MAX_WORKERS, -1));
253   }
254 
255   /**
256    * Execute the benchmark.
257    *
258    * @param args Typically the command line arguments.
259    * @throws Exception Any exception from the computation.
260    */
261   public static void main(final String[] args) throws Exception {
262     System.exit(ToolRunner.run(new ReducersBenchmark(), args));
263   }
264 }