This project has retired. For details please refer to its
Attic page.
RandomMessageBenchmark xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.benchmark;
20
21 import org.apache.commons.cli.CommandLine;
22 import org.apache.giraph.aggregators.LongSumAggregator;
23 import org.apache.giraph.graph.BasicComputation;
24 import org.apache.giraph.conf.GiraphConfiguration;
25 import org.apache.giraph.conf.GiraphConstants;
26 import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
27 import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
28 import org.apache.giraph.master.DefaultMasterCompute;
29 import org.apache.giraph.graph.Vertex;
30 import org.apache.giraph.worker.WorkerContext;
31 import org.apache.hadoop.io.BytesWritable;
32 import org.apache.hadoop.io.DoubleWritable;
33 import org.apache.hadoop.io.LongWritable;
34 import org.apache.hadoop.util.ToolRunner;
35 import org.apache.log4j.Logger;
36
37 import com.google.common.collect.Sets;
38
39 import java.io.IOException;
40 import java.util.Random;
41 import java.util.Set;
42
43
44
45
46 public class RandomMessageBenchmark extends GiraphBenchmark {
47
48 public static final String SUPERSTEP_COUNT =
49 "giraph.randomMessageBenchmark.superstepCount";
50
51 public static final String NUM_BYTES_PER_MESSAGE =
52 "giraph.randomMessageBenchmark.numBytesPerMessage";
53
54 public static final int DEFAULT_NUM_BYTES_PER_MESSAGE = 16;
55
56 public static final String NUM_MESSAGES_PER_EDGE =
57 "giraph.randomMessageBenchmark.numMessagesPerEdge";
58
59 public static final int DEFAULT_NUM_MESSAGES_PER_EDGE = 1;
60
61 public static final String AGG_SUPERSTEP_TOTAL_BYTES =
62 "superstep total bytes sent";
63
64 public static final String AGG_TOTAL_BYTES = "total bytes sent";
65
66 public static final String AGG_SUPERSTEP_TOTAL_MESSAGES =
67 "superstep total messages";
68
69 public static final String AGG_TOTAL_MESSAGES = "total messages";
70
71 public static final String AGG_SUPERSTEP_TOTAL_MILLIS =
72 "superstep total millis";
73
74 public static final String AGG_TOTAL_MILLIS = "total millis";
75
76 public static final String WORKERS_NUM = "workers";
77
78
79 private static final BenchmarkOption BYTES_PER_MESSAGE = new BenchmarkOption(
80 "b", "bytes", true, "Message bytes per memssage",
81 "Need to set the number of message bytes (-b)");
82
83 private static final BenchmarkOption MESSAGES_PER_EDGE = new BenchmarkOption(
84 "n", "number", true, "Number of messages per edge",
85 "Need to set the number of messages per edge (-n)");
86
87 private static final BenchmarkOption FLUSH_THREADS = new BenchmarkOption(
88 "f", "flusher", true, "Number of flush threads");
89
90
91 private static final Logger LOG =
92 Logger.getLogger(RandomMessageBenchmarkWorkerContext.class);
93
94
95
96
97 public static class RandomMessageBenchmarkWorkerContext extends
98 WorkerContext {
99
100 private static final Logger LOG =
101 Logger.getLogger(RandomMessageBenchmarkWorkerContext.class);
102
103 private byte[] messageBytes;
104
105 private int numMessagesPerEdge = -1;
106
107 private int numSupersteps = -1;
108
109 private final Random random = new Random(System.currentTimeMillis());
110
111 private long startSuperstepMillis = 0;
112
113 private long totalBytes = 0;
114
115 private long totalMessages = 0;
116
117 private long totalMillis = 0;
118
119 @Override
120 public void preApplication()
121 throws InstantiationException, IllegalAccessException {
122 messageBytes =
123 new byte[getContext().getConfiguration().
124 getInt(NUM_BYTES_PER_MESSAGE,
125 DEFAULT_NUM_BYTES_PER_MESSAGE)];
126 numMessagesPerEdge =
127 getContext().getConfiguration().
128 getInt(NUM_MESSAGES_PER_EDGE,
129 DEFAULT_NUM_MESSAGES_PER_EDGE);
130 numSupersteps = getContext().getConfiguration().
131 getInt(SUPERSTEP_COUNT, -1);
132 }
133
134 @Override
135 public void preSuperstep() {
136 long superstepBytes = this.<LongWritable>
137 getAggregatedValue(AGG_SUPERSTEP_TOTAL_BYTES).get();
138 long superstepMessages = this.<LongWritable>
139 getAggregatedValue(AGG_SUPERSTEP_TOTAL_MESSAGES).get();
140 long superstepMillis = this.<LongWritable>
141 getAggregatedValue(AGG_SUPERSTEP_TOTAL_MILLIS).get();
142 long workers = this.<LongWritable>getAggregatedValue(WORKERS_NUM).get();
143
144
145
146
147
148 if (getSuperstep() == 0) {
149 startSuperstepMillis = System.currentTimeMillis();
150 } else {
151 totalBytes += superstepBytes;
152 totalMessages += superstepMessages;
153 totalMillis += superstepMillis;
154 double superstepMegabytesPerSecond =
155 superstepBytes * workers * 1000d / 1024d / 1024d / superstepMillis;
156 double megabytesPerSecond = totalBytes *
157 workers * 1000d / 1024d / 1024d / totalMillis;
158 double superstepMessagesPerSecond =
159 superstepMessages * workers * 1000d / superstepMillis;
160 double messagesPerSecond =
161 totalMessages * workers * 1000d / totalMillis;
162 if (LOG.isInfoEnabled()) {
163 LOG.info("Outputing statistics for superstep " + getSuperstep());
164 LOG.info(AGG_SUPERSTEP_TOTAL_BYTES + " : " + superstepBytes);
165 LOG.info(AGG_TOTAL_BYTES + " : " + totalBytes);
166 LOG.info(AGG_SUPERSTEP_TOTAL_MESSAGES + " : " + superstepMessages);
167 LOG.info(AGG_TOTAL_MESSAGES + " : " + totalMessages);
168 LOG.info(AGG_SUPERSTEP_TOTAL_MILLIS + " : " + superstepMillis);
169 LOG.info(AGG_TOTAL_MILLIS + " : " + totalMillis);
170 LOG.info(WORKERS_NUM + " : " + workers);
171 LOG.info("Superstep megabytes / second = " +
172 superstepMegabytesPerSecond);
173 LOG.info("Total megabytes / second = " +
174 megabytesPerSecond);
175 LOG.info("Superstep messages / second = " +
176 superstepMessagesPerSecond);
177 LOG.info("Total messages / second = " +
178 messagesPerSecond);
179 LOG.info("Superstep megabytes / second / worker = " +
180 superstepMegabytesPerSecond / workers);
181 LOG.info("Total megabytes / second / worker = " +
182 megabytesPerSecond / workers);
183 LOG.info("Superstep messages / second / worker = " +
184 superstepMessagesPerSecond / workers);
185 LOG.info("Total messages / second / worker = " +
186 messagesPerSecond / workers);
187 }
188 }
189
190 aggregate(WORKERS_NUM, new LongWritable(1));
191 }
192
193 @Override
194 public void postSuperstep() {
195 long endSuperstepMillis = System.currentTimeMillis();
196 long superstepMillis = endSuperstepMillis - startSuperstepMillis;
197 startSuperstepMillis = endSuperstepMillis;
198 aggregate(AGG_SUPERSTEP_TOTAL_MILLIS, new LongWritable(superstepMillis));
199 }
200
201 @Override
202 public void postApplication() { }
203
204
205
206
207
208
209 public byte[] getMessageBytes() {
210 return messageBytes;
211 }
212
213
214
215
216
217
218 public int getNumMessagePerEdge() {
219 return numMessagesPerEdge;
220 }
221
222
223
224
225
226
227 public int getNumSupersteps() {
228 return numSupersteps;
229 }
230
231
232
233
234 public void randomizeMessageBytes() {
235 random.nextBytes(messageBytes);
236 }
237 }
238
239
240
241
242
243 public static class RandomMessageBenchmarkMasterCompute extends
244 DefaultMasterCompute {
245 @Override
246 public void initialize() throws InstantiationException,
247 IllegalAccessException {
248 registerAggregator(AGG_SUPERSTEP_TOTAL_BYTES,
249 LongSumAggregator.class);
250 registerAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES,
251 LongSumAggregator.class);
252 registerAggregator(AGG_SUPERSTEP_TOTAL_MILLIS,
253 LongSumAggregator.class);
254 registerAggregator(WORKERS_NUM,
255 LongSumAggregator.class);
256 }
257 }
258
259
260
261
262 public static class RandomMessageComputation extends BasicComputation<
263 LongWritable, DoubleWritable, DoubleWritable, BytesWritable> {
264 @Override
265 public void compute(
266 Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex,
267 Iterable<BytesWritable> messages) throws IOException {
268 RandomMessageBenchmarkWorkerContext workerContext = getWorkerContext();
269 if (getSuperstep() < workerContext.getNumSupersteps()) {
270 for (int i = 0; i < workerContext.getNumMessagePerEdge(); i++) {
271 workerContext.randomizeMessageBytes();
272 sendMessageToAllEdges(vertex,
273 new BytesWritable(workerContext.getMessageBytes()));
274 long bytesSent = workerContext.getMessageBytes().length *
275 vertex.getNumEdges();
276 aggregate(AGG_SUPERSTEP_TOTAL_BYTES, new LongWritable(bytesSent));
277 aggregate(AGG_SUPERSTEP_TOTAL_MESSAGES,
278 new LongWritable(vertex.getNumEdges()));
279 }
280 } else {
281 vertex.voteToHalt();
282 }
283 }
284 }
285
286 @Override
287 public Set<BenchmarkOption> getBenchmarkOptions() {
288 return Sets.newHashSet(BenchmarkOption.SUPERSTEPS,
289 BenchmarkOption.VERTICES, BenchmarkOption.EDGES_PER_VERTEX,
290 BYTES_PER_MESSAGE, MESSAGES_PER_EDGE, FLUSH_THREADS);
291 }
292
293 @Override
294 protected void prepareConfiguration(GiraphConfiguration conf,
295 CommandLine cmd) {
296 conf.setComputationClass(RandomMessageComputation.class);
297 conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
298 conf.setWorkerContextClass(RandomMessageBenchmarkWorkerContext.class);
299 conf.setMasterComputeClass(RandomMessageBenchmarkMasterCompute.class);
300 conf.setLong(PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
301 BenchmarkOption.VERTICES.getOptionLongValue(cmd));
302 conf.setLong(PseudoRandomInputFormatConstants.EDGES_PER_VERTEX,
303 BenchmarkOption.EDGES_PER_VERTEX.getOptionLongValue(cmd));
304 conf.setInt(SUPERSTEP_COUNT,
305 BenchmarkOption.SUPERSTEPS.getOptionIntValue(cmd));
306 conf.setInt(RandomMessageBenchmark.NUM_BYTES_PER_MESSAGE,
307 BYTES_PER_MESSAGE.getOptionIntValue(cmd));
308 conf.setInt(RandomMessageBenchmark.NUM_MESSAGES_PER_EDGE,
309 MESSAGES_PER_EDGE.getOptionIntValue(cmd));
310 if (FLUSH_THREADS.optionTurnedOn(cmd)) {
311 conf.setInt(GiraphConstants.MSG_NUM_FLUSH_THREADS,
312 FLUSH_THREADS.getOptionIntValue(cmd));
313 }
314 }
315
316
317
318
319
320
321
322 public static void main(String[] args) throws Exception {
323 System.exit(ToolRunner.run(new RandomMessageBenchmark(), args));
324 }
325 }