This project has retired. For details please refer to its
Attic page.
AggregatorsBenchmark xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.benchmark;
20
21 import java.io.IOException;
22 import java.util.Set;
23
24 import org.apache.commons.cli.CommandLine;
25 import org.apache.giraph.aggregators.LongSumAggregator;
26 import org.apache.giraph.conf.GiraphConfiguration;
27 import org.apache.giraph.conf.GiraphConstants;
28 import org.apache.giraph.graph.BasicComputation;
29 import org.apache.giraph.graph.Vertex;
30 import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
31 import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
32 import org.apache.giraph.master.DefaultMasterCompute;
33 import org.apache.giraph.utils.MasterLoggingAggregator;
34 import org.apache.giraph.worker.DefaultWorkerContext;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.io.DoubleWritable;
37 import org.apache.hadoop.io.LongWritable;
38 import org.apache.hadoop.util.ToolRunner;
39
40 import com.google.common.collect.Sets;
41
42
43
44
45 public class AggregatorsBenchmark extends GiraphBenchmark {
46
47 private static final String AGGREGATORS_NUM = "aggregatorsbenchmark.num";
48
49
50 private static final BenchmarkOption AGGREGATORS =
51 new BenchmarkOption("a", "aggregators",
52 true, "Aggregators", "Need to set number of aggregators (-a)");
53
54
55
56
57 public static class AggregatorsBenchmarkComputation extends
58 BasicComputation<LongWritable, DoubleWritable, DoubleWritable,
59 DoubleWritable> {
60 @Override
61 public void compute(
62 Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex,
63 Iterable<DoubleWritable> messages) throws IOException {
64 int n = getNumAggregators(getConf());
65 long superstep = getSuperstep();
66 int w = getWorkerContextAggregated(getConf(), superstep);
67 for (int i = 0; i < n; i++) {
68 aggregate("w" + i, new LongWritable((superstep + 1) * i));
69 aggregate("p" + i, new LongWritable(i));
70
71 assertEquals(superstep * (getTotalNumVertices() * i) + w,
72 ((LongWritable) getAggregatedValue("w" + i)).get());
73 assertEquals(-(superstep * i),
74 ((LongWritable) getAggregatedValue("m" + i)).get());
75 assertEquals(superstep * getTotalNumVertices() * i,
76 ((LongWritable) getAggregatedValue("p" + i)).get());
77 }
78 if (superstep > 2) {
79 vertex.voteToHalt();
80 }
81 }
82 }
83
84
85
86
87 public static class AggregatorsBenchmarkMasterCompute extends
88 DefaultMasterCompute {
89 @Override
90 public void initialize() throws InstantiationException,
91 IllegalAccessException {
92 int n = getNumAggregators(getConf());
93 for (int i = 0; i < n; i++) {
94 registerAggregator("w" + i, LongSumAggregator.class);
95 registerAggregator("m" + i, LongSumAggregator.class);
96 registerPersistentAggregator("p" + i, LongSumAggregator.class);
97 }
98 }
99
100 @Override
101 public void compute() {
102 int n = getNumAggregators(getConf());
103 long superstep = getSuperstep();
104 int w = getWorkerContextAggregated(getConf(), superstep);
105 for (int i = 0; i < n; i++) {
106 setAggregatedValue("m" + i, new LongWritable(-superstep * i));
107
108 if (superstep > 0) {
109 assertEquals(superstep * (getTotalNumVertices() * i) + w,
110 ((LongWritable) getAggregatedValue("w" + i)).get());
111 assertEquals(superstep * getTotalNumVertices() * i,
112 ((LongWritable) getAggregatedValue("p" + i)).get());
113 }
114 }
115 }
116 }
117
118
119
120
121 public static class AggregatorsBenchmarkWorkerContext
122 extends DefaultWorkerContext {
123 @Override
124 public void preSuperstep() {
125 addToWorkerAggregators(1);
126 checkAggregators();
127 MasterLoggingAggregator.aggregate("everything fine", this, getConf());
128 }
129
130 @Override
131 public void postSuperstep() {
132 addToWorkerAggregators(2);
133 checkAggregators();
134 }
135
136
137
138
139 private void checkAggregators() {
140 int n = getNumAggregators(getContext().getConfiguration());
141 long superstep = getSuperstep();
142 int w = getWorkerContextAggregated(
143 getContext().getConfiguration(), superstep);
144 for (int i = 0; i < n; i++) {
145 assertEquals(superstep * (getTotalNumVertices() * i) + w,
146 ((LongWritable) getAggregatedValue("w" + i)).get());
147 assertEquals(-(superstep * i),
148 ((LongWritable) getAggregatedValue("m" + i)).get());
149 assertEquals(superstep * getTotalNumVertices() * i,
150 ((LongWritable) getAggregatedValue("p" + i)).get());
151 }
152 }
153
154
155
156
157
158
159 private void addToWorkerAggregators(int valueToAdd) {
160 int n = getNumAggregators(getContext().getConfiguration());
161 for (int i = 0; i < n; i++) {
162 aggregate("w" + i, new LongWritable(valueToAdd));
163 }
164 }
165 }
166
167
168
169
170
171
172
173 private static int getNumAggregators(Configuration conf) {
174 return conf.getInt(AGGREGATORS_NUM, 0);
175 }
176
177
178
179
180
181
182
183
184 private static int getWorkerContextAggregated(Configuration conf,
185 long superstep) {
186 return (superstep <= 0) ? 0 : conf.getInt("workers", 0) * 3;
187 }
188
189
190
191
192
193
194
195 private static void assertEquals(long expected, long actual) {
196 if (expected != actual) {
197 throw new RuntimeException("expected: " + expected +
198 ", actual: " + actual);
199 }
200 }
201
202 @Override
203 public Set<BenchmarkOption> getBenchmarkOptions() {
204 return Sets.newHashSet(BenchmarkOption.VERTICES, AGGREGATORS);
205 }
206
207 @Override
208 protected void prepareConfiguration(GiraphConfiguration conf,
209 CommandLine cmd) {
210 conf.setComputationClass(AggregatorsBenchmarkComputation.class);
211 conf.setMasterComputeClass(AggregatorsBenchmarkMasterCompute.class);
212 conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
213 conf.setWorkerContextClass(AggregatorsBenchmarkWorkerContext.class);
214 conf.setLong(PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
215 BenchmarkOption.VERTICES.getOptionLongValue(cmd));
216 conf.setLong(PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 1);
217 conf.setInt(AGGREGATORS_NUM, AGGREGATORS.getOptionIntValue(cmd));
218 conf.setInt("workers", conf.getInt(GiraphConstants.MAX_WORKERS, -1));
219 MasterLoggingAggregator.setUseMasterLoggingAggregator(true, conf);
220 }
221
222
223
224
225
226
227
228 public static void main(final String[] args) throws Exception {
229 System.exit(ToolRunner.run(new AggregatorsBenchmark(), args));
230 }
231 }