This project has retired. For details please refer to its
Attic page.
GoraEdgeInputFormat xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.io.gora;
19
20 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_DATASTORE_CLASS;
21 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_END_KEY;
22 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEYS_FACTORY_CLASS;
23 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEY_CLASS;
24 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_PERSISTENT_CLASS;
25 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_START_KEY;
26
27 import java.io.IOException;
28 import java.util.List;
29
30 import org.apache.giraph.edge.Edge;
31 import org.apache.giraph.io.EdgeInputFormat;
32 import org.apache.giraph.io.EdgeReader;
33 import org.apache.giraph.io.gora.utils.ExtraGoraInputFormat;
34 import org.apache.giraph.io.gora.utils.GoraUtils;
35 import org.apache.giraph.io.gora.utils.KeyFactory;
36 import org.apache.gora.persistency.Persistent;
37 import org.apache.gora.query.Result;
38 import org.apache.gora.query.impl.QueryBase;
39 import org.apache.gora.store.DataStore;
40 import org.apache.gora.util.GoraException;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.io.Writable;
43 import org.apache.hadoop.io.WritableComparable;
44 import org.apache.hadoop.mapreduce.InputSplit;
45 import org.apache.hadoop.mapreduce.JobContext;
46 import org.apache.hadoop.mapreduce.TaskAttemptContext;
47 import org.apache.log4j.Logger;
48
49
50
51
52
53
54
55
56
57
58
59
60 public abstract class GoraEdgeInputFormat
61 <I extends WritableComparable, E extends Writable>
62 extends EdgeInputFormat<I, E> {
63
64
65 private static Object START_KEY;
66
67
68 private static Object END_KEY;
69
70
71 private static final Logger LOG =
72 Logger.getLogger(GoraEdgeInputFormat.class);
73
74
75 private static Class<?> KEY_CLASS;
76
77
78 private static Class<? extends Persistent> PERSISTENT_CLASS;
79
80
81 private static Class<? extends DataStore> DATASTORE_CLASS;
82
83
84 private static Class<?> KEY_FACTORY_CLASS;
85
86
87 private static DataStore DATA_STORE;
88
89
90 private static int RECORD_COUNTER = 0;
91
92
93 private static ExtraGoraInputFormat GORA_INPUT_FORMAT =
94 new ExtraGoraInputFormat();
95
96
97
98
99 public void checkInputSpecs(Configuration conf) {
100 String sDataStoreType =
101 GIRAPH_GORA_DATASTORE_CLASS.get(getConf());
102 String sKeyType =
103 GIRAPH_GORA_KEY_CLASS.get(getConf());
104 String sPersistentType =
105 GIRAPH_GORA_PERSISTENT_CLASS.get(getConf());
106 String sKeyFactoryClass =
107 GIRAPH_GORA_KEYS_FACTORY_CLASS.get(getConf());
108 try {
109 Class<?> keyClass = Class.forName(sKeyType);
110 Class<?> persistentClass = Class.forName(sPersistentType);
111 Class<?> dataStoreClass = Class.forName(sDataStoreType);
112 Class<?> keyFactoryClass = Class.forName(sKeyFactoryClass);
113 setKeyClass(keyClass);
114 setPersistentClass((Class<? extends Persistent>) persistentClass);
115 setDatastoreClass((Class<? extends DataStore>) dataStoreClass);
116 setKeyFactoryClass(keyFactoryClass);
117 setDataStore(createDataStore(getConf()));
118 GORA_INPUT_FORMAT.setDataStore(getDataStore());
119 } catch (ClassNotFoundException e) {
120 LOG.error("Error while reading Gora Input parameters");
121 e.printStackTrace();
122 }
123 }
124
125
126
127
128
129
130
131 @Override
132 public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
133 throws IOException, InterruptedException {
134 KeyFactory kFact = null;
135 try {
136 kFact = (KeyFactory) getKeyFactoryClass().newInstance();
137 } catch (InstantiationException e) {
138 LOG.error("Key factory was not instantiated. Please verify.");
139 LOG.error(e.getMessage());
140 e.printStackTrace();
141 } catch (IllegalAccessException e) {
142 LOG.error("Key factory was not instantiated. Please verify.");
143 LOG.error(e.getMessage());
144 e.printStackTrace();
145 }
146 String sKey = GIRAPH_GORA_START_KEY.get(getConf());
147 String eKey = GIRAPH_GORA_END_KEY.get(getConf());
148 if (sKey == null || sKey.isEmpty()) {
149 LOG.warn("No start key has been defined.");
150 LOG.warn("Querying all the data store.");
151 sKey = null;
152 eKey = null;
153 }
154 kFact.setDataStore(getDataStore());
155 setStartKey(kFact.buildKey(sKey));
156 setEndKey(kFact.buildKey(eKey));
157 QueryBase tmpQuery = GoraUtils.getQuery(
158 getDataStore(), getStartKey(), getEndKey());
159 tmpQuery.setConf(context.getConfiguration());
160 GORA_INPUT_FORMAT.setQuery(tmpQuery);
161 List<InputSplit> splits = GORA_INPUT_FORMAT.getSplits(context);
162 return splits;
163 }
164
165 @Override
166 public abstract GoraEdgeReader createEdgeReader(InputSplit split,
167 TaskAttemptContext context) throws IOException;
168
169
170
171
172
173
174 protected abstract class GoraEdgeReader extends EdgeReader<I, E> {
175
176 private Edge<I, E> edge;
177
178 private Result readResults;
179
180 @Override
181 public void initialize(InputSplit inputSplit, TaskAttemptContext context)
182 throws IOException, InterruptedException {
183 getResults();
184 RECORD_COUNTER = 0;
185 }
186
187
188
189
190
191
192
193 @Override
194
195 public boolean nextEdge() throws IOException, InterruptedException {
196 boolean flg = false;
197 try {
198 flg = this.getReadResults().next();
199 this.edge = transformEdge(this.getReadResults().get());
200 RECORD_COUNTER++;
201 } catch (Exception e) {
202 LOG.debug("Error transforming vertices.");
203 flg = false;
204 }
205 LOG.debug(RECORD_COUNTER + " were transformed.");
206 return flg;
207 }
208
209
210
211
212
213
214 @Override
215 public float getProgress() throws IOException, InterruptedException {
216 float progress = 0.0f;
217 if (getReadResults() != null) {
218 progress = getReadResults().getProgress();
219 }
220 return progress;
221 }
222
223
224
225
226
227
228 @Override
229 public Edge<I, E> getCurrentEdge()
230 throws IOException, InterruptedException {
231 return this.edge;
232 }
233
234
235
236
237
238
239
240 protected abstract Edge<I, E> transformEdge(Object goraObject);
241
242
243
244
245 protected void getResults() {
246 setReadResults(GoraUtils.getRequest(getDataStore(),
247 getStartKey(), getEndKey()));
248 }
249
250
251
252
253
254 @Override
255 public void close() throws IOException {
256 }
257
258
259
260
261
262 Result getReadResults() {
263 return readResults;
264 }
265
266
267
268
269
270 void setReadResults(Result readResults) {
271 this.readResults = readResults;
272 }
273 }
274
275
276
277
278
279
280 public DataStore createDataStore(Configuration conf) {
281 DataStore dsCreated = null;
282 try {
283 dsCreated = GoraUtils.createSpecificDataStore(conf, getDatastoreClass(),
284 getKeyClass(), getPersistentClass());
285 } catch (GoraException e) {
286 LOG.error("Error creating data store.");
287 e.printStackTrace();
288 }
289 return dsCreated;
290 }
291
292
293
294
295
296 static Class<? extends Persistent> getPersistentClass() {
297 return PERSISTENT_CLASS;
298 }
299
300
301
302
303
304 static void setPersistentClass
305 (Class<? extends Persistent> persistentClassUsed) {
306 PERSISTENT_CLASS = persistentClassUsed;
307 }
308
309
310
311
312
313 static Class<?> getKeyClass() {
314 return KEY_CLASS;
315 }
316
317
318
319
320
321 static void setKeyClass(Class<?> keyClassUsed) {
322 KEY_CLASS = keyClassUsed;
323 }
324
325
326
327
328 public static Class<? extends DataStore> getDatastoreClass() {
329 return DATASTORE_CLASS;
330 }
331
332
333
334
335 public static void setDatastoreClass(
336 Class<? extends DataStore> dataStoreClass) {
337 DATASTORE_CLASS = dataStoreClass;
338 }
339
340
341
342
343
344 public Object getStartKey() {
345 return START_KEY;
346 }
347
348
349
350
351
352 public static void setStartKey(Object startKey) {
353 START_KEY = startKey;
354 }
355
356
357
358
359
360 static Object getEndKey() {
361 return END_KEY;
362 }
363
364
365
366
367
368 static void setEndKey(Object pEndKey) {
369 END_KEY = pEndKey;
370 }
371
372
373
374
375
376 static Class<?> getKeyFactoryClass() {
377 return KEY_FACTORY_CLASS;
378 }
379
380
381
382
383
384 static void setKeyFactoryClass(Class<?> keyFactoryClass) {
385 KEY_FACTORY_CLASS = keyFactoryClass;
386 }
387
388
389
390
391
392 public static DataStore getDataStore() {
393 return DATA_STORE;
394 }
395
396
397
398
399
400 public static void setDataStore(DataStore dStore) {
401 DATA_STORE = dStore;
402 }
403
404
405
406
407
408 public static Logger getLogger() {
409 return LOG;
410 }
411 }