This project has retired. For details please refer to its Attic page.
GoraVertexInputFormat xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.io.gora;
19  
20  import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_DATASTORE_CLASS;
21  import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_END_KEY;
22  import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEYS_FACTORY_CLASS;
23  import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEY_CLASS;
24  import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_PERSISTENT_CLASS;
25  import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_START_KEY;
26  
27  import java.io.IOException;
28  import java.util.List;
29  
30  import org.apache.giraph.graph.Vertex;
31  import org.apache.giraph.io.VertexInputFormat;
32  import org.apache.giraph.io.VertexReader;
33  import org.apache.giraph.io.gora.utils.ExtraGoraInputFormat;
34  import org.apache.giraph.io.gora.utils.GoraUtils;
35  import org.apache.giraph.io.gora.utils.KeyFactory;
36  import org.apache.gora.persistency.Persistent;
37  import org.apache.gora.query.Result;
38  import org.apache.gora.query.impl.QueryBase;
39  import org.apache.gora.store.DataStore;
40  import org.apache.gora.util.GoraException;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.io.Writable;
43  import org.apache.hadoop.io.WritableComparable;
44  import org.apache.hadoop.mapreduce.InputSplit;
45  import org.apache.hadoop.mapreduce.JobContext;
46  import org.apache.hadoop.mapreduce.TaskAttemptContext;
47  import org.apache.log4j.Logger;
48  
49  /**
50   *  Class which wraps the GoraInputFormat. It's designed
51   *  as an extension point to VertexInputFormat subclasses who wish
52   *  to read from Gora data sources.
53   *
54   *  Works with
55   *  {@link GoraVertexOutputFormat}
56   *
57   * @param <I> vertex id type
58   * @param <V>  vertex value type
59   * @param <E>  edge type
60   */
61  public abstract class GoraVertexInputFormat<
62          I extends WritableComparable,
63          V extends Writable,
64          E extends Writable>
65          extends VertexInputFormat<I, V, E> {
66  
67    /** Start key for querying Gora data store. */
68    private static Object START_KEY;
69  
70    /** End key for querying Gora data store. */
71    private static Object END_KEY;
72  
73    /** Logger for Gora's vertex input format. */
74    private static final Logger LOG =
75            Logger.getLogger(GoraVertexInputFormat.class);
76  
77    /** KeyClass used for getting data. */
78    private static Class<?> KEY_CLASS;
79  
80    /** The vertex itself will be used as a value inside Gora. */
81    private static Class<? extends Persistent> PERSISTENT_CLASS;
82  
83    /** Data store class to be used as backend. */
84    private static Class<? extends DataStore> DATASTORE_CLASS;
85  
86    /** Class used to transform strings into Keys */
87    private static Class<?> KEY_FACTORY_CLASS;
88  
89    /** Data store used for querying data. */
90    private static DataStore DATA_STORE;
91  
92    /** counter for iinput records */
93    private static int RECORD_COUNTER = 0;
94  
95    /** Delegate Gora input format */
96    private static ExtraGoraInputFormat GORA_INPUT_FORMAT =
97           new ExtraGoraInputFormat();
98  
99    /** @param conf configuration parameters */
100   public void checkInputSpecs(Configuration conf) {
101     String sDataStoreType =
102         GIRAPH_GORA_DATASTORE_CLASS.get(getConf());
103     String sKeyType =
104         GIRAPH_GORA_KEY_CLASS.get(getConf());
105     String sPersistentType =
106         GIRAPH_GORA_PERSISTENT_CLASS.get(getConf());
107     String sKeyFactoryClass =
108         GIRAPH_GORA_KEYS_FACTORY_CLASS.get(getConf());
109     try {
110       Class<?> keyClass = Class.forName(sKeyType);
111       Class<?> persistentClass = Class.forName(sPersistentType);
112       Class<?> dataStoreClass = Class.forName(sDataStoreType);
113       Class<?> keyFactoryClass = Class.forName(sKeyFactoryClass);
114       setKeyClass(keyClass);
115       setPersistentClass((Class<? extends Persistent>) persistentClass);
116       setDatastoreClass((Class<? extends DataStore>) dataStoreClass);
117       setKeyFactoryClass(keyFactoryClass);
118       setDataStore(createDataStore(conf));
119       GORA_INPUT_FORMAT.setDataStore(getDataStore());
120     } catch (ClassNotFoundException e) {
121       LOG.error("Error while reading Gora Input parameters");
122       e.printStackTrace();
123     }
124   }
125 
126   /**
127    * Create a vertex reader for a given split. Guaranteed to have been
128    * configured with setConf() prior to use. The framework will also call
129    * {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
130    * the split is used.
131    *
132    * @param split the split to be read
133    * @param context the information about the task
134    * @return a new record reader
135    * @throws IOException
136    */
137   public abstract GoraVertexReader createVertexReader(InputSplit split,
138     TaskAttemptContext context) throws IOException;
139 
140   /**
141    * Gets the splits for a data store.
142    * @param context JobContext
143    * @param minSplitCountHint Hint for a minimum split count
144    * @return A list of splits
145    */
146   @Override
147   public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
148     throws IOException, InterruptedException {
149     KeyFactory kFact = null;
150     try {
151       kFact = (KeyFactory) getKeyFactoryClass().newInstance();
152       kFact.setDataStore(getDataStore());
153     } catch (InstantiationException e) {
154       LOG.error("Key factory was not instantiated. Please verify.");
155       LOG.error(e.getMessage());
156       e.printStackTrace();
157     } catch (IllegalAccessException e) {
158       LOG.error("Key factory was not instantiated. Please verify.");
159       LOG.error(e.getMessage());
160       e.printStackTrace();
161     }
162     String sKey = GIRAPH_GORA_START_KEY.get(getConf());
163     String eKey = GIRAPH_GORA_END_KEY.get(getConf());
164     if (sKey == null || sKey.isEmpty()) {
165       LOG.warn("No start key has been defined.");
166       LOG.warn("Querying all the data store.");
167       sKey = null;
168       eKey = null;
169     } else {
170       setStartKey(kFact.buildKey(sKey));
171       setEndKey(kFact.buildKey(eKey));
172     }
173     QueryBase tmpQuery = GoraUtils.getQuery(
174         getDataStore(), getStartKey(), getEndKey());
175     tmpQuery.setConf(getConf());
176     GORA_INPUT_FORMAT.setQuery(tmpQuery);
177     List<InputSplit> splits = GORA_INPUT_FORMAT.getSplits(context);
178     return splits;
179   }
180 
181   /**
182    * Gets the data store object initialized.
183    * @param conf Configuration
184    * @return DataStore created
185    */
186   public DataStore createDataStore(Configuration conf) {
187     DataStore dsCreated = null;
188     try {
189       dsCreated = GoraUtils.createSpecificDataStore(conf, getDatastoreClass(),
190           getKeyClass(), getPersistentClass());
191     } catch (GoraException e) {
192       LOG.error("Error creating data store.");
193       e.printStackTrace();
194     }
195     return dsCreated;
196   }
197 
198   /**
199    * Abstract class to be implemented by the user based on their specific
200    * vertex input. Easiest to ignore the key value separator and only use
201    * key instead.
202    */
203   protected abstract class GoraVertexReader extends VertexReader<I, V, E> {
204     /** Current vertex */
205     private Vertex<I, V, E> vertex;
206     /** Results gotten from Gora data store. */
207     private Result readResults;
208 
209     @Override
210     public void initialize(InputSplit inputSplit, TaskAttemptContext context)
211       throws IOException, InterruptedException {
212       getResults();
213       RECORD_COUNTER = 0;
214     }
215 
216     /**
217      * Gets the next vertex from Gora data store.
218      * @return true/false depending on the existence of vertices.
219      * @throws IOException exceptions passed along.
220      * @throws InterruptedException exceptions passed along.
221      */
222     @Override
223     // CHECKSTYLE: stop IllegalCatch
224     public boolean nextVertex() throws IOException, InterruptedException {
225       boolean flg = false;
226       try {
227         flg = this.getReadResults().next();
228         this.vertex = transformVertex(this.getReadResults().get());
229         RECORD_COUNTER++;
230       } catch (Exception e) {
231         LOG.error("Error transforming vertices.");
232         LOG.error(e.getMessage());
233         flg = false;
234       }
235       LOG.debug(RECORD_COUNTER + " were transformed.");
236       return flg;
237     }
238     // CHECKSTYLE: resume IllegalCatch
239 
240     /**
241      * Gets the progress of reading results from Gora.
242      * @return the progress of reading results from Gora.
243      */
244     @Override
245     public float getProgress() throws IOException, InterruptedException {
246       float progress = 0.0f;
247       if (getReadResults() != null) {
248         progress = getReadResults().getProgress();
249       }
250       return progress;
251     }
252 
253     /**
254      * Gets current vertex.
255      *
256      * @return  The vertex object represented by a Gora object
257      */
258     @Override
259     public Vertex<I, V, E> getCurrentVertex()
260       throws IOException, InterruptedException {
261       return this.vertex;
262     }
263 
264     /**
265      * Parser for a single Gora object
266      *
267      * @param   goraObject vertex represented as a GoraObject
268      * @return  The vertex object represented by a Gora object
269      */
270     protected abstract Vertex<I, V, E> transformVertex(Object goraObject);
271 
272     /**
273      * Performs a range query to a Gora data store.
274      */
275     protected void getResults() {
276       setReadResults(GoraUtils.getRequest(getDataStore(),
277           getStartKey(), getEndKey()));
278     }
279 
280     /**
281      * Finishes the reading process.
282      * @throws IOException
283      */
284     @Override
285     public void close() throws IOException {
286     }
287 
288     /**
289      * Gets the results read.
290      * @return results read.
291      */
292     Result getReadResults() {
293       return readResults;
294     }
295 
296     /**
297      * Sets the results read.
298      * @param readResults results read.
299      */
300     void setReadResults(Result readResults) {
301       this.readResults = readResults;
302     }
303   }
304 
305   /**
306    * Gets the persistent Class
307    * @return persistentClass used
308    */
309   static Class<? extends Persistent> getPersistentClass() {
310     return PERSISTENT_CLASS;
311   }
312 
313   /**
314    * Sets the persistent Class
315    * @param persistentClassUsed to be set
316    */
317   static void setPersistentClass
318   (Class<? extends Persistent> persistentClassUsed) {
319     PERSISTENT_CLASS = persistentClassUsed;
320   }
321 
322   /**
323    * Gets the key class used.
324    * @return the key class used.
325    */
326   static Class<?> getKeyClass() {
327     return KEY_CLASS;
328   }
329 
330   /**
331    * Sets the key class used.
332    * @param keyClassUsed key class used.
333    */
334   static void setKeyClass(Class<?> keyClassUsed) {
335     KEY_CLASS = keyClassUsed;
336   }
337 
338   /**
339    * @return Class the DATASTORE_CLASS
340    */
341   public static Class<? extends DataStore> getDatastoreClass() {
342     return DATASTORE_CLASS;
343   }
344 
345   /**
346    * @param dataStoreClass the dataStore class to set
347    */
348   public static void setDatastoreClass(
349       Class<? extends DataStore> dataStoreClass) {
350     DATASTORE_CLASS = dataStoreClass;
351   }
352 
353   /**
354    * Gets the start key for querying.
355    * @return the start key.
356    */
357   public Object getStartKey() {
358     return START_KEY;
359   }
360 
361   /**
362    * Gets the start key for querying.
363    * @param startKey start key.
364    */
365   public static void setStartKey(Object startKey) {
366     START_KEY = startKey;
367   }
368 
369   /**
370    * Gets the end key for querying.
371    * @return the end key.
372    */
373   static Object getEndKey() {
374     return END_KEY;
375   }
376 
377   /**
378    * Sets the end key for querying.
379    * @param pEndKey start key.
380    */
381   static void setEndKey(Object pEndKey) {
382     END_KEY = pEndKey;
383   }
384 
385   /**
386    * Gets the key factory class.
387    * @return the kEY_FACTORY_CLASS
388    */
389   static Class<?> getKeyFactoryClass() {
390     return KEY_FACTORY_CLASS;
391   }
392 
393   /**
394    * Sets the key factory class.
395    * @param keyFactoryClass the keyFactoryClass to set.
396    */
397   static void setKeyFactoryClass(Class<?> keyFactoryClass) {
398     KEY_FACTORY_CLASS = keyFactoryClass;
399   }
400 
401   /**
402    * Gets the data store.
403    * @return DataStore
404    */
405   public static DataStore getDataStore() {
406     return DATA_STORE;
407   }
408 
409   /**
410    * Sets the data store
411    * @param dStore the dATA_STORE to set
412    */
413   public static void setDataStore(DataStore dStore) {
414     DATA_STORE = dStore;
415   }
416 }