This project has retired. For details please refer to its Attic page.
AggregatorsBenchmark xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.benchmark;
20  
21  import java.io.IOException;
22  import java.util.Set;
23  
24  import org.apache.commons.cli.CommandLine;
25  import org.apache.giraph.aggregators.LongSumAggregator;
26  import org.apache.giraph.conf.GiraphConfiguration;
27  import org.apache.giraph.conf.GiraphConstants;
28  import org.apache.giraph.graph.BasicComputation;
29  import org.apache.giraph.graph.Vertex;
30  import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
31  import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
32  import org.apache.giraph.master.DefaultMasterCompute;
33  import org.apache.giraph.utils.MasterLoggingAggregator;
34  import org.apache.giraph.worker.DefaultWorkerContext;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.io.DoubleWritable;
37  import org.apache.hadoop.io.LongWritable;
38  import org.apache.hadoop.util.ToolRunner;
39  
40  import com.google.common.collect.Sets;
41  
42  /**
43   * Benchmark for aggregators. Also checks the correctness.
44   */
45  public class AggregatorsBenchmark extends GiraphBenchmark {
46    /** Number of aggregators setting */
47    private static final String AGGREGATORS_NUM = "aggregatorsbenchmark.num";
48  
49    /** Option for number of aggregators */
50    private static final BenchmarkOption AGGREGATORS =
51        new BenchmarkOption("a", "aggregators",
52            true, "Aggregators", "Need to set number of aggregators (-a)");
53  
54    /**
55     * Vertex class for AggregatorsBenchmark
56     */
57    public static class AggregatorsBenchmarkComputation extends
58        BasicComputation<LongWritable, DoubleWritable, DoubleWritable,
59            DoubleWritable> {
60      @Override
61      public void compute(
62          Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex,
63          Iterable<DoubleWritable> messages) throws IOException {
64        int n = getNumAggregators(getConf());
65        long superstep = getSuperstep();
66        int w = getWorkerContextAggregated(getConf(), superstep);
67        for (int i = 0; i < n; i++) {
68          aggregate("w" + i, new LongWritable((superstep + 1) * i));
69          aggregate("p" + i, new LongWritable(i));
70  
71          assertEquals(superstep * (getTotalNumVertices() * i) + w,
72              ((LongWritable) getAggregatedValue("w" + i)).get());
73          assertEquals(-(superstep * i),
74              ((LongWritable) getAggregatedValue("m" + i)).get());
75          assertEquals(superstep * getTotalNumVertices() * i,
76              ((LongWritable) getAggregatedValue("p" + i)).get());
77        }
78        if (superstep > 2) {
79          vertex.voteToHalt();
80        }
81      }
82    }
83  
84    /**
85     * MasterCompute class for AggregatorsBenchmark
86     */
87    public static class AggregatorsBenchmarkMasterCompute extends
88        DefaultMasterCompute {
89      @Override
90      public void initialize() throws InstantiationException,
91          IllegalAccessException {
92        int n = getNumAggregators(getConf());
93        for (int i = 0; i < n; i++) {
94          registerAggregator("w" + i, LongSumAggregator.class);
95          registerAggregator("m" + i, LongSumAggregator.class);
96          registerPersistentAggregator("p" + i, LongSumAggregator.class);
97        }
98      }
99  
100     @Override
101     public void compute() {
102       int n = getNumAggregators(getConf());
103       long superstep = getSuperstep();
104       int w = getWorkerContextAggregated(getConf(), superstep);
105       for (int i = 0; i < n; i++) {
106         setAggregatedValue("m" + i, new LongWritable(-superstep * i));
107 
108         if (superstep > 0) {
109           assertEquals(superstep * (getTotalNumVertices() * i) + w,
110               ((LongWritable) getAggregatedValue("w" + i)).get());
111           assertEquals(superstep * getTotalNumVertices() * i,
112               ((LongWritable) getAggregatedValue("p" + i)).get());
113         }
114       }
115     }
116   }
117 
118   /**
119    * WorkerContext class for AggregatorsBenchmark
120    */
121   public static class AggregatorsBenchmarkWorkerContext
122       extends DefaultWorkerContext {
123     @Override
124     public void preSuperstep() {
125       addToWorkerAggregators(1);
126       checkAggregators();
127       MasterLoggingAggregator.aggregate("everything fine", this, getConf());
128     }
129 
130     @Override
131     public void postSuperstep() {
132       addToWorkerAggregators(2);
133       checkAggregators();
134     }
135 
136     /**
137      * Check if aggregator values are correct for current superstep
138      */
139     private void checkAggregators() {
140       int n = getNumAggregators(getContext().getConfiguration());
141       long superstep = getSuperstep();
142       int w = getWorkerContextAggregated(
143           getContext().getConfiguration(), superstep);
144       for (int i = 0; i < n; i++) {
145         assertEquals(superstep * (getTotalNumVertices() * i) + w,
146             ((LongWritable) getAggregatedValue("w" + i)).get());
147         assertEquals(-(superstep * i),
148             ((LongWritable) getAggregatedValue("m" + i)).get());
149         assertEquals(superstep * getTotalNumVertices() * i,
150             ((LongWritable) getAggregatedValue("p" + i)).get());
151       }
152     }
153 
154     /**
155      * Add some value to worker aggregators.
156      *
157      * @param valueToAdd Which value to add
158      */
159     private void addToWorkerAggregators(int valueToAdd) {
160       int n = getNumAggregators(getContext().getConfiguration());
161       for (int i = 0; i < n; i++) {
162         aggregate("w" + i, new LongWritable(valueToAdd));
163       }
164     }
165   }
166 
167   /**
168    * Get the number of aggregators from configuration
169    *
170    * @param conf Configuration
171    * @return Number of aggregators
172    */
173   private static int getNumAggregators(Configuration conf) {
174     return conf.getInt(AGGREGATORS_NUM, 0);
175   }
176 
177   /**
178    * Get the value which should be aggreagted by worker context
179    *
180    * @param conf Configuration
181    * @param superstep Superstep
182    * @return The value which should be aggregated by worker context
183    */
184   private static int getWorkerContextAggregated(Configuration conf,
185       long superstep) {
186     return (superstep <= 0) ? 0 : conf.getInt("workers", 0) * 3;
187   }
188 
189   /**
190    * Check if values are equal, throw an exception if they aren't
191    *
192    * @param expected Expected value
193    * @param actual Actual value
194    */
195   private static void assertEquals(long expected, long actual) {
196     if (expected != actual) {
197       throw new RuntimeException("expected: " + expected +
198           ", actual: " + actual);
199     }
200   }
201 
202   @Override
203   public Set<BenchmarkOption> getBenchmarkOptions() {
204     return Sets.newHashSet(BenchmarkOption.VERTICES, AGGREGATORS);
205   }
206 
207   @Override
208   protected void prepareConfiguration(GiraphConfiguration conf,
209       CommandLine cmd) {
210     conf.setComputationClass(AggregatorsBenchmarkComputation.class);
211     conf.setMasterComputeClass(AggregatorsBenchmarkMasterCompute.class);
212     conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
213     conf.setWorkerContextClass(AggregatorsBenchmarkWorkerContext.class);
214     conf.setLong(PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
215         BenchmarkOption.VERTICES.getOptionLongValue(cmd));
216     conf.setLong(PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 1);
217     conf.setInt(AGGREGATORS_NUM, AGGREGATORS.getOptionIntValue(cmd));
218     conf.setInt("workers", conf.getInt(GiraphConstants.MAX_WORKERS, -1));
219     MasterLoggingAggregator.setUseMasterLoggingAggregator(true, conf);
220   }
221 
222   /**
223    * Execute the benchmark.
224    *
225    * @param args Typically the command line arguments.
226    * @throws Exception Any exception from the computation.
227    */
228   public static void main(final String[] args) throws Exception {
229     System.exit(ToolRunner.run(new AggregatorsBenchmark(), args));
230   }
231 }