This project has retired. For details please refer to its
Attic page.
InputSplitsCallable xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.worker;
20
21 import java.io.ByteArrayInputStream;
22 import java.io.DataInputStream;
23 import java.io.IOException;
24 import java.util.concurrent.Callable;
25
26 import org.apache.giraph.bsp.CentralizedServiceWorker;
27 import org.apache.giraph.comm.WorkerClientRequestProcessor;
28 import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
29 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
30 import org.apache.giraph.graph.VertexEdgeCount;
31 import org.apache.giraph.io.GiraphInputFormat;
32 import org.apache.giraph.io.InputType;
33 import org.apache.giraph.metrics.GiraphMetrics;
34 import org.apache.giraph.metrics.GiraphMetricsRegistry;
35 import org.apache.giraph.metrics.MeterDesc;
36 import org.apache.giraph.metrics.MetricNames;
37 import org.apache.giraph.ooc.OutOfCoreEngine;
38 import org.apache.giraph.time.SystemTime;
39 import org.apache.giraph.time.Time;
40 import org.apache.giraph.time.Times;
41 import org.apache.hadoop.io.Writable;
42 import org.apache.hadoop.io.WritableComparable;
43 import org.apache.hadoop.mapreduce.InputSplit;
44 import org.apache.hadoop.mapreduce.Mapper;
45 import org.apache.log4j.Logger;
46
47 import com.yammer.metrics.core.Counter;
48 import com.yammer.metrics.core.Meter;
49 import com.yammer.metrics.util.PercentGauge;
50
51
52
53
54
55
56
57
58
59
60 public abstract class InputSplitsCallable<I extends WritableComparable,
61 V extends Writable, E extends Writable>
62 implements Callable<VertexEdgeCount> {
63
64 private static final Logger LOG = Logger.getLogger(InputSplitsCallable.class);
65
66 private static final Time TIME = SystemTime.get();
67
68 protected final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
69
70 protected final Mapper<?, ?, ?, ?>.Context context;
71
72 protected final WorkerClientRequestProcessor<I, V, E>
73 workerClientRequestProcessor;
74
75
76
77
78 private final WorkerInputSplitsHandler splitsHandler;
79
80 private final long startNanos = TIME.getNanoseconds();
81
82 private final boolean useLocality;
83
84 private final CentralizedServiceWorker<I, V, E> serviceWorker;
85
86
87
88
89
90
91
92
93
94 public InputSplitsCallable(
95 Mapper<?, ?, ?, ?>.Context context,
96 ImmutableClassesGiraphConfiguration<I, V, E> configuration,
97 BspServiceWorker<I, V, E> bspServiceWorker,
98 WorkerInputSplitsHandler splitsHandler) {
99 this.context = context;
100 this.workerClientRequestProcessor =
101 new NettyWorkerClientRequestProcessor<I, V, E>(
102 context, configuration, bspServiceWorker,
103 false
104 this.useLocality = configuration.useInputSplitLocality();
105 this.splitsHandler = splitsHandler;
106 this.configuration = configuration;
107 this.serviceWorker = bspServiceWorker;
108 }
109
110
111
112
113
114
115 public abstract GiraphInputFormat getInputFormat();
116
117
118
119
120
121
122 public abstract InputType getInputType();
123
124
125
126
127
128
129 public static Meter getTotalEdgesLoadedMeter() {
130 return GiraphMetrics.get().perJobRequired()
131 .getMeter(MeterDesc.EDGES_LOADED);
132 }
133
134
135
136
137
138
139 public static Counter getTotalEdgesFilteredCounter() {
140 return GiraphMetrics.get().perJobRequired()
141 .getCounter(MetricNames.EDGES_FILTERED);
142 }
143
144
145
146
147
148
149 public static Meter getTotalVerticesLoadedMeter() {
150 return GiraphMetrics.get().perJobRequired()
151 .getMeter(MeterDesc.VERTICES_LOADED);
152 }
153
154
155
156
157
158
159 public static Counter getTotalVerticesFilteredCounter() {
160 return GiraphMetrics.get().perJobRequired()
161 .getCounter(MetricNames.VERTICES_FILTERED);
162 }
163
164
165
166
167 public static void initMetrics() {
168 GiraphMetricsRegistry metrics = GiraphMetrics.get().perJobRequired();
169
170 final Counter edgesFiltered = getTotalEdgesFilteredCounter();
171 final Meter edgesLoaded = getTotalEdgesLoadedMeter();
172
173 metrics.getGauge(MetricNames.EDGES_FILTERED_PCT, new PercentGauge() {
174 @Override protected double getNumerator() {
175 return edgesFiltered.count();
176 }
177
178 @Override protected double getDenominator() {
179 return edgesLoaded.count();
180 }
181 });
182
183 final Counter verticesFiltered = getTotalVerticesFilteredCounter();
184 final Meter verticesLoaded = getTotalVerticesLoadedMeter();
185
186 metrics.getGauge(MetricNames.VERTICES_FILTERED_PCT, new PercentGauge() {
187 @Override protected double getNumerator() {
188 return verticesFiltered.count();
189 }
190
191 @Override protected double getDenominator() {
192 return verticesLoaded.count();
193 }
194 });
195 }
196
197
198
199
200
201
202
203
204
205 protected abstract VertexEdgeCount readInputSplit(InputSplit inputSplit)
206 throws IOException, InterruptedException;
207
208 @Override
209 public VertexEdgeCount call() {
210 VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
211 int inputSplitsProcessed = 0;
212 try {
213 OutOfCoreEngine oocEngine = serviceWorker.getServerData().getOocEngine();
214 if (oocEngine != null) {
215 oocEngine.processingThreadStart();
216 }
217 while (true) {
218 byte[] serializedInputSplit = splitsHandler.reserveInputSplit(
219 getInputType(), inputSplitsProcessed == 0);
220 if (serializedInputSplit == null) {
221
222 break;
223 }
224
225
226
227 if (oocEngine != null) {
228 oocEngine.activeThreadCheckIn();
229 }
230 vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
231 loadInputSplit(serializedInputSplit));
232 context.progress();
233 ++inputSplitsProcessed;
234 }
235 if (oocEngine != null) {
236 oocEngine.processingThreadFinish();
237 }
238 } catch (InterruptedException e) {
239 throw new IllegalStateException("call: InterruptedException", e);
240 } catch (IOException e) {
241 throw new IllegalStateException("call: IOException", e);
242 } catch (ClassNotFoundException e) {
243 throw new IllegalStateException("call: ClassNotFoundException", e);
244 }
245
246 if (LOG.isInfoEnabled()) {
247 float seconds = Times.getNanosSince(TIME, startNanos) /
248 Time.NS_PER_SECOND_AS_FLOAT;
249 float verticesPerSecond = vertexEdgeCount.getVertexCount() / seconds;
250 float edgesPerSecond = vertexEdgeCount.getEdgeCount() / seconds;
251 LOG.info("call: Loaded " + inputSplitsProcessed + " " +
252 "input splits in " + seconds + " secs, " + vertexEdgeCount +
253 " " + verticesPerSecond + " vertices/sec, " +
254 edgesPerSecond + " edges/sec");
255 }
256 try {
257 workerClientRequestProcessor.flush();
258 } catch (IOException e) {
259 throw new IllegalStateException("call: Flushing failed.", e);
260 }
261 return vertexEdgeCount;
262 }
263
264
265
266
267
268
269
270
271
272
273
274
275
276 private VertexEdgeCount loadInputSplit(byte[] serializedInputSplit)
277 throws IOException, ClassNotFoundException, InterruptedException {
278 InputSplit inputSplit = getInputSplit(serializedInputSplit);
279 VertexEdgeCount vertexEdgeCount = readInputSplit(inputSplit);
280 if (LOG.isInfoEnabled()) {
281 LOG.info("loadFromInputSplit: Finished loading " + vertexEdgeCount);
282 }
283 return vertexEdgeCount;
284 }
285
286
287
288
289
290
291
292
293
294
295 protected InputSplit getInputSplit(byte[] serializedInputSplit)
296 throws IOException, ClassNotFoundException {
297 DataInputStream inputStream =
298 new DataInputStream(new ByteArrayInputStream(serializedInputSplit));
299 InputSplit inputSplit = getInputFormat().readInputSplit(inputStream);
300
301 if (LOG.isInfoEnabled()) {
302 LOG.info("getInputSplit: Reserved input split '" +
303 inputSplit.toString() + "'");
304 }
305 return inputSplit;
306 }
307 }