This project has retired. For details please refer to its
Attic page.
ReducersBenchmark xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.benchmark;
20
21 import java.io.DataInput;
22 import java.io.DataOutput;
23 import java.io.IOException;
24 import java.util.Set;
25
26 import org.apache.commons.cli.CommandLine;
27 import org.apache.giraph.conf.GiraphConfiguration;
28 import org.apache.giraph.conf.GiraphConstants;
29 import org.apache.giraph.graph.BasicComputation;
30 import org.apache.giraph.graph.Vertex;
31 import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
32 import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
33 import org.apache.giraph.master.DefaultMasterCompute;
34 import org.apache.giraph.reducers.ReduceSameTypeOperation;
35 import org.apache.giraph.worker.DefaultWorkerContext;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.io.DoubleWritable;
38 import org.apache.hadoop.io.LongWritable;
39 import org.apache.hadoop.util.ToolRunner;
40
41 import com.google.common.collect.Sets;
42
43
44
45
46 public class ReducersBenchmark extends GiraphBenchmark {
47
48 private static final String REDUCERS_NUM = "reducersbenchmark.num";
49
50
51 private static final BenchmarkOption REDUCERS =
52 new BenchmarkOption("r", "reducers",
53 true, "Reducers", "Need to set number of reducers (-r)");
54
55
56 public static class TestLongSumReducer
57 extends ReduceSameTypeOperation<LongWritable> {
58
59 public static final TestLongSumReducer INSTANCE = new TestLongSumReducer();
60
61 @Override
62 public LongWritable createInitialValue() {
63 return new LongWritable();
64 }
65
66 @Override
67 public LongWritable reduce(
68 LongWritable curValue, LongWritable valueToReduce) {
69 curValue.set(curValue.get() + valueToReduce.get());
70 return curValue;
71 }
72
73 @Override
74 public void readFields(DataInput in) throws IOException {
75 }
76
77 @Override
78 public void write(DataOutput out) throws IOException {
79 }
80 }
81
82
83
84
85 public static class ReducersBenchmarkComputation extends
86 BasicComputation<LongWritable, DoubleWritable, DoubleWritable,
87 DoubleWritable> {
88 @Override
89 public void compute(
90 Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex,
91 Iterable<DoubleWritable> messages) throws IOException {
92 int n = getNumReducers(getConf());
93 long superstep = getSuperstep();
94 int w = getWorkerContextReduced(getConf(), superstep);
95 for (int i = 0; i < n; i++) {
96 reduce("w" + i, new LongWritable((superstep + 1) * i));
97 reduce("p" + i, new LongWritable(i));
98
99 if (superstep > 0) {
100 assertEquals(superstep * (getTotalNumVertices() * i) + w,
101 ((LongWritable) getBroadcast("w" + i)).get());
102 assertEquals(-(superstep * i),
103 ((LongWritable) getBroadcast("m" + i)).get());
104 assertEquals(superstep * getTotalNumVertices() * i,
105 ((LongWritable) getBroadcast("p" + i)).get());
106 }
107 }
108 if (superstep > 2) {
109 vertex.voteToHalt();
110 }
111 }
112 }
113
114
115
116
117 public static class ReducersBenchmarkMasterCompute extends
118 DefaultMasterCompute {
119 @Override
120 public void compute() {
121 int n = getNumReducers(getConf());
122 long superstep = getSuperstep();
123 int w = getWorkerContextReduced(getConf(), superstep);
124 for (int i = 0; i < n; i++) {
125 String wi = "w" + i;
126 String mi = "m" + i;
127 String pi = "p" + i;
128
129 registerReducer(wi, TestLongSumReducer.INSTANCE);
130 registerReducer(mi, new TestLongSumReducer());
131
132 if (superstep > 0) {
133 broadcast(wi, getReduced(wi));
134 broadcast(mi, new LongWritable(-superstep * i));
135 broadcast(pi, getReduced(pi));
136
137 registerReducer(pi, new TestLongSumReducer(),
138 (LongWritable) getReduced(pi));
139
140 assertEquals(superstep * (getTotalNumVertices() * i) + w,
141 ((LongWritable) getReduced(wi)).get());
142 assertEquals(superstep * getTotalNumVertices() * i,
143 ((LongWritable) getReduced(pi)).get());
144 } else {
145 registerReducer(pi, new TestLongSumReducer());
146 }
147 }
148 }
149 }
150
151
152
153
154 public static class ReducersBenchmarkWorkerContext
155 extends DefaultWorkerContext {
156 @Override
157 public void preSuperstep() {
158 addToWorkerReducers(1);
159 checkReducers();
160 }
161
162 @Override
163 public void postSuperstep() {
164 addToWorkerReducers(2);
165 checkReducers();
166 }
167
168
169
170
171 private void checkReducers() {
172 int n = getNumReducers(getContext().getConfiguration());
173 long superstep = getSuperstep();
174 int w = getWorkerContextReduced(
175 getContext().getConfiguration(), superstep);
176 for (int i = 0; i < n; i++) {
177 if (superstep > 0) {
178 assertEquals(superstep * (getTotalNumVertices() * i) + w,
179 ((LongWritable) getBroadcast("w" + i)).get());
180 assertEquals(-(superstep * i),
181 ((LongWritable) getBroadcast("m" + i)).get());
182 assertEquals(superstep * getTotalNumVertices() * i,
183 ((LongWritable) getBroadcast("p" + i)).get());
184 }
185 }
186 }
187
188
189
190
191
192
193 private void addToWorkerReducers(int valueToAdd) {
194 int n = getNumReducers(getContext().getConfiguration());
195 for (int i = 0; i < n; i++) {
196 reduce("w" + i, new LongWritable(valueToAdd));
197 }
198 }
199 }
200
201
202
203
204
205
206
207 private static int getNumReducers(Configuration conf) {
208 return conf.getInt(REDUCERS_NUM, 0);
209 }
210
211
212
213
214
215
216
217
218 private static int getWorkerContextReduced(Configuration conf,
219 long superstep) {
220 return (superstep <= 0) ? 0 : conf.getInt("workers", 0) * 3;
221 }
222
223
224
225
226
227
228
229 private static void assertEquals(long expected, long actual) {
230 if (expected != actual) {
231 throw new RuntimeException("expected: " + expected +
232 ", actual: " + actual);
233 }
234 }
235
236 @Override
237 public Set<BenchmarkOption> getBenchmarkOptions() {
238 return Sets.newHashSet(BenchmarkOption.VERTICES, REDUCERS);
239 }
240
241 @Override
242 protected void prepareConfiguration(GiraphConfiguration conf,
243 CommandLine cmd) {
244 conf.setComputationClass(ReducersBenchmarkComputation.class);
245 conf.setMasterComputeClass(ReducersBenchmarkMasterCompute.class);
246 conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
247 conf.setWorkerContextClass(ReducersBenchmarkWorkerContext.class);
248 conf.setLong(PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
249 BenchmarkOption.VERTICES.getOptionLongValue(cmd));
250 conf.setLong(PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 1);
251 conf.setInt(REDUCERS_NUM, REDUCERS.getOptionIntValue(cmd));
252 conf.setInt("workers", conf.getInt(GiraphConstants.MAX_WORKERS, -1));
253 }
254
255
256
257
258
259
260
261 public static void main(final String[] args) throws Exception {
262 System.exit(ToolRunner.run(new ReducersBenchmark(), args));
263 }
264 }