This project has retired. For details please refer to its Attic page.
RandomMessageBenchmark xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.benchmark;
20  
21  import org.apache.commons.cli.CommandLine;
22  import org.apache.giraph.aggregators.LongSumAggregator;
23  import org.apache.giraph.graph.BasicComputation;
24  import org.apache.giraph.conf.GiraphConfiguration;
25  import org.apache.giraph.conf.GiraphConstants;
26  import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
27  import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
28  import org.apache.giraph.master.DefaultMasterCompute;
29  import org.apache.giraph.graph.Vertex;
30  import org.apache.giraph.worker.WorkerContext;
31  import org.apache.hadoop.io.BytesWritable;
32  import org.apache.hadoop.io.DoubleWritable;
33  import org.apache.hadoop.io.LongWritable;
34  import org.apache.hadoop.util.ToolRunner;
35  import org.apache.log4j.Logger;
36  
37  import com.google.common.collect.Sets;
38  
39  import java.io.IOException;
40  import java.util.Random;
41  import java.util.Set;
42  
43  /**
44   * Random Message Benchmark for evaluating the messaging performance.
45   */
46  public class RandomMessageBenchmark extends GiraphBenchmark {
47    /** How many supersteps to run */
48    public static final String SUPERSTEP_COUNT =
49        "giraph.randomMessageBenchmark.superstepCount";
50    /** How many bytes per message */
51    public static final String NUM_BYTES_PER_MESSAGE =
52        "giraph.randomMessageBenchmark.numBytesPerMessage";
53    /** Default bytes per message */
54    public static final int DEFAULT_NUM_BYTES_PER_MESSAGE = 16;
55    /** How many messages per edge */
56    public static final String NUM_MESSAGES_PER_EDGE =
57        "giraph.randomMessageBenchmark.numMessagesPerEdge";
58    /** Default messages per edge */
59    public static final int DEFAULT_NUM_MESSAGES_PER_EDGE = 1;
60    /** All bytes sent during this superstep */
61    public static final String AGG_SUPERSTEP_TOTAL_BYTES =
62        "superstep total bytes sent";
63    /** All bytes sent during this application */
64    public static final String AGG_TOTAL_BYTES = "total bytes sent";
65    /** All messages during this superstep */
66    public static final String AGG_SUPERSTEP_TOTAL_MESSAGES =
67        "superstep total messages";
68    /** All messages during this application */
69    public static final String AGG_TOTAL_MESSAGES = "total messages";
70    /** All millis during this superstep */
71    public static final String AGG_SUPERSTEP_TOTAL_MILLIS =
72        "superstep total millis";
73    /** All millis during this application */
74    public static final String AGG_TOTAL_MILLIS = "total millis";
75    /** Workers for that superstep */
76    public static final String WORKERS_NUM = "workers";
77  
78    /** Option for number of bytes per message */
79    private static final BenchmarkOption BYTES_PER_MESSAGE = new BenchmarkOption(
80        "b", "bytes", true, "Message bytes per memssage",
81        "Need to set the number of message bytes (-b)");
82    /** Option for number of messages per edge */
83    private static final BenchmarkOption MESSAGES_PER_EDGE = new BenchmarkOption(
84        "n", "number", true, "Number of messages per edge",
85        "Need to set the number of messages per edge (-n)");
86    /** Option for number of flush threads */
87    private static final BenchmarkOption FLUSH_THREADS = new BenchmarkOption(
88        "f", "flusher", true, "Number of flush threads");
89  
90    /** Class logger */
91    private static final Logger LOG =
92      Logger.getLogger(RandomMessageBenchmarkWorkerContext.class);
93  
94    /**
95     * {@link WorkerContext} forRandomMessageBenchmark.
96     */
97    public static class RandomMessageBenchmarkWorkerContext extends
98        WorkerContext {
99      /** Class logger */
100     private static final Logger LOG =
101       Logger.getLogger(RandomMessageBenchmarkWorkerContext.class);
102     /** Bytes to be sent */
103     private byte[] messageBytes;
104     /** Number of messages sent per edge */
105     private int numMessagesPerEdge = -1;
106     /** Number of supersteps */
107     private int numSupersteps = -1;
108     /** Random generator for random bytes message */
109     private final Random random = new Random(System.currentTimeMillis());
110     /** Start superstep millis */
111     private long startSuperstepMillis = 0;
112     /** Total bytes */
113     private long totalBytes = 0;
114     /** Total messages */
115     private long totalMessages = 0;
116     /** Total millis */
117     private long totalMillis = 0;
118 
119     @Override
120     public void preApplication()
121       throws InstantiationException, IllegalAccessException {
122       messageBytes =
123         new byte[getContext().getConfiguration().
124                  getInt(NUM_BYTES_PER_MESSAGE,
125                  DEFAULT_NUM_BYTES_PER_MESSAGE)];
126       numMessagesPerEdge =
127           getContext().getConfiguration().
128           getInt(NUM_MESSAGES_PER_EDGE,
129               DEFAULT_NUM_MESSAGES_PER_EDGE);
130       numSupersteps = getContext().getConfiguration().
131           getInt(SUPERSTEP_COUNT, -1);
132     }
133 
134     @Override
135     public void preSuperstep() {
136       long superstepBytes = this.<LongWritable>
137           getAggregatedValue(AGG_SUPERSTEP_TOTAL_BYTES).get();
138       long superstepMessages = this.<LongWritable>
139           getAggregatedValue(AGG_SUPERSTEP_TOTAL_MESSAGES).get();
140       long superstepMillis = this.<LongWritable>
141           getAggregatedValue(AGG_SUPERSTEP_TOTAL_MILLIS).get();
142       long workers = this.<LongWritable>getAggregatedValue(WORKERS_NUM).get();
143 
144       // For timing and tracking the supersteps
145       // - superstep 0 starts the time, but cannot display any stats
146       //   since nothing has been aggregated yet
147       // - supersteps > 0 can display the stats
148       if (getSuperstep() == 0) {
149         startSuperstepMillis = System.currentTimeMillis();
150       } else {
151         totalBytes += superstepBytes;
152         totalMessages += superstepMessages;
153         totalMillis += superstepMillis;
154         double superstepMegabytesPerSecond =
155             superstepBytes * workers * 1000d / 1024d / 1024d / superstepMillis;
156         double megabytesPerSecond = totalBytes *
157             workers * 1000d / 1024d / 1024d / totalMillis;
158         double superstepMessagesPerSecond =
159             superstepMessages * workers * 1000d / superstepMillis;
160         double messagesPerSecond =
161             totalMessages * workers * 1000d / totalMillis;
162         if (LOG.isInfoEnabled()) {
163           LOG.info("Outputing statistics for superstep " + getSuperstep());
164           LOG.info(AGG_SUPERSTEP_TOTAL_BYTES + " : " + superstepBytes);
165           LOG.info(AGG_TOTAL_BYTES + " : " + totalBytes);
166           LOG.info(AGG_SUPERSTEP_TOTAL_MESSAGES + " : " + superstepMessages);
167           LOG.info(AGG_TOTAL_MESSAGES + " : " + totalMessages);
168           LOG.info(AGG_SUPERSTEP_TOTAL_MILLIS + " : " + superstepMillis);
169           LOG.info(AGG_TOTAL_MILLIS + " : " + totalMillis);
170           LOG.info(WORKERS_NUM + " : " + workers);
171           LOG.info("Superstep megabytes / second = " +
172               superstepMegabytesPerSecond);
173           LOG.info("Total megabytes / second = " +
174               megabytesPerSecond);
175           LOG.info("Superstep messages / second = " +
176               superstepMessagesPerSecond);
177           LOG.info("Total messages / second = " +
178               messagesPerSecond);
179           LOG.info("Superstep megabytes / second / worker = " +
180               superstepMegabytesPerSecond / workers);
181           LOG.info("Total megabytes / second / worker = " +
182               megabytesPerSecond / workers);
183           LOG.info("Superstep messages / second / worker = " +
184               superstepMessagesPerSecond / workers);
185           LOG.info("Total messages / second / worker = " +
186               messagesPerSecond / workers);
187         }
188       }
189 
190       aggregate(WORKERS_NUM, new LongWritable(1));
191     }
192 
193     @Override
194     public void postSuperstep() {
195       long endSuperstepMillis = System.currentTimeMillis();
196       long superstepMillis = endSuperstepMillis - startSuperstepMillis;
197       startSuperstepMillis = endSuperstepMillis;
198       aggregate(AGG_SUPERSTEP_TOTAL_MILLIS, new LongWritable(superstepMillis));
199     }
200 
201     @Override
202     public void postApplication() { }
203 
204     /**
205      * Get the message bytes to be used for sending.
206      *
207      * @return Byte array used for messages.
208      */
209     public byte[] getMessageBytes() {
210       return messageBytes;
211     }
212 
213     /**
214      * Get the number of edges per message.
215      *
216      * @return Messages per edge.
217      */
218     public int getNumMessagePerEdge() {
219       return numMessagesPerEdge;
220     }
221 
222     /**
223      * Get the number of supersteps.
224      *
225      * @return Number of supersteps.
226      */
227     public int getNumSupersteps() {
228       return numSupersteps;
229     }
230 
231     /**
232      * Randomize the message bytes.
233      */
234     public void randomizeMessageBytes() {
235       random.nextBytes(messageBytes);
236     }
237   }
238 
239   /**
240    * Master compute associated with {@link RandomMessageBenchmark}.
241    * It registers required aggregators.
242    */
243   public static class RandomMessageBenchmarkMasterCompute extends
244       DefaultMasterCompute {
245     @Override
246     public void initialize() throws InstantiationException,
247         IllegalAccessException {
248       registerAggregator(AGG_SUPERSTEP_TOTAL_BYTES,
249           LongSumAggregator.class);
250       registerAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES,
251           LongSumAggregator.class);
252       registerAggregator(AGG_SUPERSTEP_TOTAL_MILLIS,
253           LongSumAggregator.class);
254       registerAggregator(WORKERS_NUM,
255           LongSumAggregator.class);
256     }
257   }
258 
259   /**
260    * Actual message computation (messaging in this case)
261    */
262   public static class RandomMessageComputation extends BasicComputation<
263       LongWritable, DoubleWritable, DoubleWritable, BytesWritable> {
264     @Override
265     public void compute(
266         Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex,
267         Iterable<BytesWritable> messages) throws IOException {
268       RandomMessageBenchmarkWorkerContext workerContext = getWorkerContext();
269       if (getSuperstep() < workerContext.getNumSupersteps()) {
270         for (int i = 0; i < workerContext.getNumMessagePerEdge(); i++) {
271           workerContext.randomizeMessageBytes();
272           sendMessageToAllEdges(vertex,
273               new BytesWritable(workerContext.getMessageBytes()));
274           long bytesSent = workerContext.getMessageBytes().length *
275               vertex.getNumEdges();
276           aggregate(AGG_SUPERSTEP_TOTAL_BYTES, new LongWritable(bytesSent));
277           aggregate(AGG_SUPERSTEP_TOTAL_MESSAGES,
278               new LongWritable(vertex.getNumEdges()));
279         }
280       } else {
281         vertex.voteToHalt();
282       }
283     }
284   }
285 
286   @Override
287   public Set<BenchmarkOption> getBenchmarkOptions() {
288     return Sets.newHashSet(BenchmarkOption.SUPERSTEPS,
289         BenchmarkOption.VERTICES, BenchmarkOption.EDGES_PER_VERTEX,
290         BYTES_PER_MESSAGE, MESSAGES_PER_EDGE, FLUSH_THREADS);
291   }
292 
293   @Override
294   protected void prepareConfiguration(GiraphConfiguration conf,
295       CommandLine cmd) {
296     conf.setComputationClass(RandomMessageComputation.class);
297     conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
298     conf.setWorkerContextClass(RandomMessageBenchmarkWorkerContext.class);
299     conf.setMasterComputeClass(RandomMessageBenchmarkMasterCompute.class);
300     conf.setLong(PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
301         BenchmarkOption.VERTICES.getOptionLongValue(cmd));
302     conf.setLong(PseudoRandomInputFormatConstants.EDGES_PER_VERTEX,
303         BenchmarkOption.EDGES_PER_VERTEX.getOptionLongValue(cmd));
304     conf.setInt(SUPERSTEP_COUNT,
305         BenchmarkOption.SUPERSTEPS.getOptionIntValue(cmd));
306     conf.setInt(RandomMessageBenchmark.NUM_BYTES_PER_MESSAGE,
307         BYTES_PER_MESSAGE.getOptionIntValue(cmd));
308     conf.setInt(RandomMessageBenchmark.NUM_MESSAGES_PER_EDGE,
309         MESSAGES_PER_EDGE.getOptionIntValue(cmd));
310     if (FLUSH_THREADS.optionTurnedOn(cmd)) {
311       conf.setInt(GiraphConstants.MSG_NUM_FLUSH_THREADS,
312           FLUSH_THREADS.getOptionIntValue(cmd));
313     }
314   }
315 
316   /**
317    * Execute the benchmark.
318    *
319    * @param args Typically, this is the command line arguments.
320    * @throws Exception Any exception thrown during computation.
321    */
322   public static void main(String[] args) throws Exception {
323     System.exit(ToolRunner.run(new RandomMessageBenchmark(), args));
324   }
325 }