This project has retired. For details please refer to its
Attic page.
GoraVertexInputFormat xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.io.gora;
19
20 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_DATASTORE_CLASS;
21 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_END_KEY;
22 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEYS_FACTORY_CLASS;
23 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEY_CLASS;
24 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_PERSISTENT_CLASS;
25 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_START_KEY;
26
27 import java.io.IOException;
28 import java.util.List;
29
30 import org.apache.giraph.graph.Vertex;
31 import org.apache.giraph.io.VertexInputFormat;
32 import org.apache.giraph.io.VertexReader;
33 import org.apache.giraph.io.gora.utils.ExtraGoraInputFormat;
34 import org.apache.giraph.io.gora.utils.GoraUtils;
35 import org.apache.giraph.io.gora.utils.KeyFactory;
36 import org.apache.gora.persistency.Persistent;
37 import org.apache.gora.query.Result;
38 import org.apache.gora.query.impl.QueryBase;
39 import org.apache.gora.store.DataStore;
40 import org.apache.gora.util.GoraException;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.io.Writable;
43 import org.apache.hadoop.io.WritableComparable;
44 import org.apache.hadoop.mapreduce.InputSplit;
45 import org.apache.hadoop.mapreduce.JobContext;
46 import org.apache.hadoop.mapreduce.TaskAttemptContext;
47 import org.apache.log4j.Logger;
48
49
50
51
52
53
54
55
56
57
58
59
60
61 public abstract class GoraVertexInputFormat<
62 I extends WritableComparable,
63 V extends Writable,
64 E extends Writable>
65 extends VertexInputFormat<I, V, E> {
66
67
68 private static Object START_KEY;
69
70
71 private static Object END_KEY;
72
73
74 private static final Logger LOG =
75 Logger.getLogger(GoraVertexInputFormat.class);
76
77
78 private static Class<?> KEY_CLASS;
79
80
81 private static Class<? extends Persistent> PERSISTENT_CLASS;
82
83
84 private static Class<? extends DataStore> DATASTORE_CLASS;
85
86
87 private static Class<?> KEY_FACTORY_CLASS;
88
89
90 private static DataStore DATA_STORE;
91
92
93 private static int RECORD_COUNTER = 0;
94
95
96 private static ExtraGoraInputFormat GORA_INPUT_FORMAT =
97 new ExtraGoraInputFormat();
98
99
100 public void checkInputSpecs(Configuration conf) {
101 String sDataStoreType =
102 GIRAPH_GORA_DATASTORE_CLASS.get(getConf());
103 String sKeyType =
104 GIRAPH_GORA_KEY_CLASS.get(getConf());
105 String sPersistentType =
106 GIRAPH_GORA_PERSISTENT_CLASS.get(getConf());
107 String sKeyFactoryClass =
108 GIRAPH_GORA_KEYS_FACTORY_CLASS.get(getConf());
109 try {
110 Class<?> keyClass = Class.forName(sKeyType);
111 Class<?> persistentClass = Class.forName(sPersistentType);
112 Class<?> dataStoreClass = Class.forName(sDataStoreType);
113 Class<?> keyFactoryClass = Class.forName(sKeyFactoryClass);
114 setKeyClass(keyClass);
115 setPersistentClass((Class<? extends Persistent>) persistentClass);
116 setDatastoreClass((Class<? extends DataStore>) dataStoreClass);
117 setKeyFactoryClass(keyFactoryClass);
118 setDataStore(createDataStore(conf));
119 GORA_INPUT_FORMAT.setDataStore(getDataStore());
120 } catch (ClassNotFoundException e) {
121 LOG.error("Error while reading Gora Input parameters");
122 e.printStackTrace();
123 }
124 }
125
126
127
128
129
130
131
132
133
134
135
136
137 public abstract GoraVertexReader createVertexReader(InputSplit split,
138 TaskAttemptContext context) throws IOException;
139
140
141
142
143
144
145
146 @Override
147 public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
148 throws IOException, InterruptedException {
149 KeyFactory kFact = null;
150 try {
151 kFact = (KeyFactory) getKeyFactoryClass().newInstance();
152 kFact.setDataStore(getDataStore());
153 } catch (InstantiationException e) {
154 LOG.error("Key factory was not instantiated. Please verify.");
155 LOG.error(e.getMessage());
156 e.printStackTrace();
157 } catch (IllegalAccessException e) {
158 LOG.error("Key factory was not instantiated. Please verify.");
159 LOG.error(e.getMessage());
160 e.printStackTrace();
161 }
162 String sKey = GIRAPH_GORA_START_KEY.get(getConf());
163 String eKey = GIRAPH_GORA_END_KEY.get(getConf());
164 if (sKey == null || sKey.isEmpty()) {
165 LOG.warn("No start key has been defined.");
166 LOG.warn("Querying all the data store.");
167 sKey = null;
168 eKey = null;
169 } else {
170 setStartKey(kFact.buildKey(sKey));
171 setEndKey(kFact.buildKey(eKey));
172 }
173 QueryBase tmpQuery = GoraUtils.getQuery(
174 getDataStore(), getStartKey(), getEndKey());
175 tmpQuery.setConf(getConf());
176 GORA_INPUT_FORMAT.setQuery(tmpQuery);
177 List<InputSplit> splits = GORA_INPUT_FORMAT.getSplits(context);
178 return splits;
179 }
180
181
182
183
184
185
186 public DataStore createDataStore(Configuration conf) {
187 DataStore dsCreated = null;
188 try {
189 dsCreated = GoraUtils.createSpecificDataStore(conf, getDatastoreClass(),
190 getKeyClass(), getPersistentClass());
191 } catch (GoraException e) {
192 LOG.error("Error creating data store.");
193 e.printStackTrace();
194 }
195 return dsCreated;
196 }
197
198
199
200
201
202
203 protected abstract class GoraVertexReader extends VertexReader<I, V, E> {
204
205 private Vertex<I, V, E> vertex;
206
207 private Result readResults;
208
209 @Override
210 public void initialize(InputSplit inputSplit, TaskAttemptContext context)
211 throws IOException, InterruptedException {
212 getResults();
213 RECORD_COUNTER = 0;
214 }
215
216
217
218
219
220
221
222 @Override
223
224 public boolean nextVertex() throws IOException, InterruptedException {
225 boolean flg = false;
226 try {
227 flg = this.getReadResults().next();
228 this.vertex = transformVertex(this.getReadResults().get());
229 RECORD_COUNTER++;
230 } catch (Exception e) {
231 LOG.error("Error transforming vertices.");
232 LOG.error(e.getMessage());
233 flg = false;
234 }
235 LOG.debug(RECORD_COUNTER + " were transformed.");
236 return flg;
237 }
238
239
240
241
242
243
244 @Override
245 public float getProgress() throws IOException, InterruptedException {
246 float progress = 0.0f;
247 if (getReadResults() != null) {
248 progress = getReadResults().getProgress();
249 }
250 return progress;
251 }
252
253
254
255
256
257
258 @Override
259 public Vertex<I, V, E> getCurrentVertex()
260 throws IOException, InterruptedException {
261 return this.vertex;
262 }
263
264
265
266
267
268
269
270 protected abstract Vertex<I, V, E> transformVertex(Object goraObject);
271
272
273
274
275 protected void getResults() {
276 setReadResults(GoraUtils.getRequest(getDataStore(),
277 getStartKey(), getEndKey()));
278 }
279
280
281
282
283
284 @Override
285 public void close() throws IOException {
286 }
287
288
289
290
291
292 Result getReadResults() {
293 return readResults;
294 }
295
296
297
298
299
300 void setReadResults(Result readResults) {
301 this.readResults = readResults;
302 }
303 }
304
305
306
307
308
309 static Class<? extends Persistent> getPersistentClass() {
310 return PERSISTENT_CLASS;
311 }
312
313
314
315
316
317 static void setPersistentClass
318 (Class<? extends Persistent> persistentClassUsed) {
319 PERSISTENT_CLASS = persistentClassUsed;
320 }
321
322
323
324
325
326 static Class<?> getKeyClass() {
327 return KEY_CLASS;
328 }
329
330
331
332
333
334 static void setKeyClass(Class<?> keyClassUsed) {
335 KEY_CLASS = keyClassUsed;
336 }
337
338
339
340
341 public static Class<? extends DataStore> getDatastoreClass() {
342 return DATASTORE_CLASS;
343 }
344
345
346
347
348 public static void setDatastoreClass(
349 Class<? extends DataStore> dataStoreClass) {
350 DATASTORE_CLASS = dataStoreClass;
351 }
352
353
354
355
356
357 public Object getStartKey() {
358 return START_KEY;
359 }
360
361
362
363
364
365 public static void setStartKey(Object startKey) {
366 START_KEY = startKey;
367 }
368
369
370
371
372
373 static Object getEndKey() {
374 return END_KEY;
375 }
376
377
378
379
380
381 static void setEndKey(Object pEndKey) {
382 END_KEY = pEndKey;
383 }
384
385
386
387
388
389 static Class<?> getKeyFactoryClass() {
390 return KEY_FACTORY_CLASS;
391 }
392
393
394
395
396
397 static void setKeyFactoryClass(Class<?> keyFactoryClass) {
398 KEY_FACTORY_CLASS = keyFactoryClass;
399 }
400
401
402
403
404
405 public static DataStore getDataStore() {
406 return DATA_STORE;
407 }
408
409
410
411
412
413 public static void setDataStore(DataStore dStore) {
414 DATA_STORE = dStore;
415 }
416 }