View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.io.gora;
19  
20  import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_DATASTORE_CLASS;
21  import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_END_KEY;
22  import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEYS_FACTORY_CLASS;
23  import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEY_CLASS;
24  import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_PERSISTENT_CLASS;
25  import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_START_KEY;
26  
27  import java.io.IOException;
28  import java.util.List;
29  
30  import org.apache.giraph.edge.Edge;
31  import org.apache.giraph.io.EdgeInputFormat;
32  import org.apache.giraph.io.EdgeReader;
33  import org.apache.giraph.io.gora.utils.ExtraGoraInputFormat;
34  import org.apache.giraph.io.gora.utils.GoraUtils;
35  import org.apache.giraph.io.gora.utils.KeyFactory;
36  import org.apache.gora.persistency.Persistent;
37  import org.apache.gora.query.Result;
38  import org.apache.gora.query.impl.QueryBase;
39  import org.apache.gora.store.DataStore;
40  import org.apache.gora.util.GoraException;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.io.Writable;
43  import org.apache.hadoop.io.WritableComparable;
44  import org.apache.hadoop.mapreduce.InputSplit;
45  import org.apache.hadoop.mapreduce.JobContext;
46  import org.apache.hadoop.mapreduce.TaskAttemptContext;
47  import org.apache.log4j.Logger;
48  
49  /**
50   *  Class which wraps the GoraInputFormat. It's designed
51   *  as an extension point to EdgeInputFormat subclasses who wish
52   *  to read from Gora data sources.
53   *
54   *  Works with
55   *  {@link GoraVertexOutputFormat}
56   *
57   * @param <I> vertex id type
58   * @param <E>  edge type
59   */
60  public abstract class GoraEdgeInputFormat
61    <I extends WritableComparable, E extends Writable>
62    extends EdgeInputFormat<I, E> {
63  
64    /** Start key for querying Gora data store. */
65    private static Object START_KEY;
66  
67    /** End key for querying Gora data store. */
68    private static Object END_KEY;
69  
70    /** Logger for Gora's vertex input format. */
71    private static final Logger LOG =
72            Logger.getLogger(GoraEdgeInputFormat.class);
73  
74    /** KeyClass used for getting data. */
75    private static Class<?> KEY_CLASS;
76  
77    /** The vertex itself will be used as a value inside Gora. */
78    private static Class<? extends Persistent> PERSISTENT_CLASS;
79  
80    /** Data store class to be used as backend. */
81    private static Class<? extends DataStore> DATASTORE_CLASS;
82  
83    /** Class used to transform strings into Keys */
84    private static Class<?> KEY_FACTORY_CLASS;
85  
86    /** Data store used for querying data. */
87    private static DataStore DATA_STORE;
88  
89    /** counter for iinput records */
90    private static int RECORD_COUNTER = 0;
91  
92    /** Delegate Gora input format */
93    private static ExtraGoraInputFormat GORA_INPUT_FORMAT =
94           new ExtraGoraInputFormat();
95  
96    /**
97     * @param conf configuration parameters
98     */
99    public void checkInputSpecs(Configuration conf) {
100     String sDataStoreType =
101         GIRAPH_GORA_DATASTORE_CLASS.get(getConf());
102     String sKeyType =
103         GIRAPH_GORA_KEY_CLASS.get(getConf());
104     String sPersistentType =
105         GIRAPH_GORA_PERSISTENT_CLASS.get(getConf());
106     String sKeyFactoryClass =
107         GIRAPH_GORA_KEYS_FACTORY_CLASS.get(getConf());
108     try {
109       Class<?> keyClass = Class.forName(sKeyType);
110       Class<?> persistentClass = Class.forName(sPersistentType);
111       Class<?> dataStoreClass = Class.forName(sDataStoreType);
112       Class<?> keyFactoryClass = Class.forName(sKeyFactoryClass);
113       setKeyClass(keyClass);
114       setPersistentClass((Class<? extends Persistent>) persistentClass);
115       setDatastoreClass((Class<? extends DataStore>) dataStoreClass);
116       setKeyFactoryClass(keyFactoryClass);
117       setDataStore(createDataStore(getConf()));
118       GORA_INPUT_FORMAT.setDataStore(getDataStore());
119     } catch (ClassNotFoundException e) {
120       LOG.error("Error while reading Gora Input parameters");
121       e.printStackTrace();
122     }
123   }
124 
125   /**
126    * Gets the splits for a data store.
127    * @param context JobContext
128    * @param minSplitCountHint Hint for a minimum split count
129    * @return A list of splits
130    */
131   @Override
132   public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
133     throws IOException, InterruptedException {
134     KeyFactory kFact = null;
135     try {
136       kFact = (KeyFactory) getKeyFactoryClass().newInstance();
137     } catch (InstantiationException e) {
138       LOG.error("Key factory was not instantiated. Please verify.");
139       LOG.error(e.getMessage());
140       e.printStackTrace();
141     } catch (IllegalAccessException e) {
142       LOG.error("Key factory was not instantiated. Please verify.");
143       LOG.error(e.getMessage());
144       e.printStackTrace();
145     }
146     String sKey = GIRAPH_GORA_START_KEY.get(getConf());
147     String eKey = GIRAPH_GORA_END_KEY.get(getConf());
148     if (sKey == null || sKey.isEmpty()) {
149       LOG.warn("No start key has been defined.");
150       LOG.warn("Querying all the data store.");
151       sKey = null;
152       eKey = null;
153     }
154     kFact.setDataStore(getDataStore());
155     setStartKey(kFact.buildKey(sKey));
156     setEndKey(kFact.buildKey(eKey));
157     QueryBase tmpQuery = GoraUtils.getQuery(
158         getDataStore(), getStartKey(), getEndKey());
159     tmpQuery.setConf(context.getConfiguration());
160     GORA_INPUT_FORMAT.setQuery(tmpQuery);
161     List<InputSplit> splits = GORA_INPUT_FORMAT.getSplits(context);
162     return splits;
163   }
164 
165   @Override
166   public abstract GoraEdgeReader createEdgeReader(InputSplit split,
167       TaskAttemptContext context) throws IOException;
168 
169   /**
170    * Abstract class to be implemented by the user based on their specific
171    * vertex input. Easiest to ignore the key value separator and only use
172    * key instead.
173    */
174   protected abstract class GoraEdgeReader extends EdgeReader<I, E> {
175     /** current edge obtained from Rexster */
176     private Edge<I, E> edge;
177     /** Results gotten from Gora data store. */
178     private Result readResults;
179 
180     @Override
181     public void initialize(InputSplit inputSplit, TaskAttemptContext context)
182       throws IOException, InterruptedException {
183       getResults();
184       RECORD_COUNTER = 0;
185     }
186 
187     /**
188      * Gets the next edge from Gora data store.
189      * @return true/false depending on the existence of vertices.
190      * @throws IOException exceptions passed along.
191      * @throws InterruptedException exceptions passed along.
192      */
193     @Override
194     // CHECKSTYLE: stop IllegalCatch
195     public boolean nextEdge() throws IOException, InterruptedException {
196       boolean flg = false;
197       try {
198         flg = this.getReadResults().next();
199         this.edge = transformEdge(this.getReadResults().get());
200         RECORD_COUNTER++;
201       } catch (Exception e) {
202         LOG.debug("Error transforming vertices.");
203         flg = false;
204       }
205       LOG.debug(RECORD_COUNTER + " were transformed.");
206       return flg;
207     }
208     // CHECKSTYLE: resume IllegalCatch
209 
210     /**
211      * Gets the progress of reading results from Gora.
212      * @return the progress of reading results from Gora.
213      */
214     @Override
215     public float getProgress() throws IOException, InterruptedException {
216       float progress = 0.0f;
217       if (getReadResults() != null) {
218         progress = getReadResults().getProgress();
219       }
220       return progress;
221     }
222 
223     /**
224      * Gets current edge.
225      *
226      * @return  The edge object represented by a Gora object
227      */
228     @Override
229     public Edge<I, E> getCurrentEdge()
230       throws IOException, InterruptedException {
231       return this.edge;
232     }
233 
234     /**
235      * Parser for a single Gora object
236      *
237      * @param   goraObject vertex represented as a GoraObject
238      * @return  The edge object represented by a Gora object
239      */
240     protected abstract Edge<I, E> transformEdge(Object goraObject);
241 
242     /**
243      * Performs a range query to a Gora data store.
244      */
245     protected void getResults() {
246       setReadResults(GoraUtils.getRequest(getDataStore(),
247           getStartKey(), getEndKey()));
248     }
249 
250     /**
251      * Finishes the reading process.
252      * @throws IOException
253      */
254     @Override
255     public void close() throws IOException {
256     }
257 
258     /**
259      * Gets the results read.
260      * @return results read.
261      */
262     Result getReadResults() {
263       return readResults;
264     }
265 
266     /**
267      * Sets the results read.
268      * @param readResults results read.
269      */
270     void setReadResults(Result readResults) {
271       this.readResults = readResults;
272     }
273   }
274 
275   /**
276    * Gets the data store object initialized.
277    * @param conf Configuration
278    * @return DataStore created
279    */
280   public DataStore createDataStore(Configuration conf) {
281     DataStore dsCreated = null;
282     try {
283       dsCreated = GoraUtils.createSpecificDataStore(conf, getDatastoreClass(),
284           getKeyClass(), getPersistentClass());
285     } catch (GoraException e) {
286       LOG.error("Error creating data store.");
287       e.printStackTrace();
288     }
289     return dsCreated;
290   }
291 
292   /**
293    * Gets the persistent Class
294    * @return persistentClass used
295    */
296   static Class<? extends Persistent> getPersistentClass() {
297     return PERSISTENT_CLASS;
298   }
299 
300   /**
301    * Sets the persistent Class
302    * @param persistentClassUsed to be set
303    */
304   static void setPersistentClass
305   (Class<? extends Persistent> persistentClassUsed) {
306     PERSISTENT_CLASS = persistentClassUsed;
307   }
308 
309   /**
310    * Gets the key class used.
311    * @return the key class used.
312    */
313   static Class<?> getKeyClass() {
314     return KEY_CLASS;
315   }
316 
317   /**
318    * Sets the key class used.
319    * @param keyClassUsed key class used.
320    */
321   static void setKeyClass(Class<?> keyClassUsed) {
322     KEY_CLASS = keyClassUsed;
323   }
324 
325   /**
326    * @return Class the DATASTORE_CLASS
327    */
328   public static Class<? extends DataStore> getDatastoreClass() {
329     return DATASTORE_CLASS;
330   }
331 
332   /**
333    * @param dataStoreClass the dataStore class to set
334    */
335   public static void setDatastoreClass(
336       Class<? extends DataStore> dataStoreClass) {
337     DATASTORE_CLASS = dataStoreClass;
338   }
339 
340   /**
341    * Gets the start key for querying.
342    * @return the start key.
343    */
344   public Object getStartKey() {
345     return START_KEY;
346   }
347 
348   /**
349    * Gets the start key for querying.
350    * @param startKey start key.
351    */
352   public static void setStartKey(Object startKey) {
353     START_KEY = startKey;
354   }
355 
356   /**
357    * Gets the end key for querying.
358    * @return the end key.
359    */
360   static Object getEndKey() {
361     return END_KEY;
362   }
363 
364   /**
365    * Sets the end key for querying.
366    * @param pEndKey start key.
367    */
368   static void setEndKey(Object pEndKey) {
369     END_KEY = pEndKey;
370   }
371 
372   /**
373    * Gets the key factory class.
374    * @return the kEY_FACTORY_CLASS
375    */
376   static Class<?> getKeyFactoryClass() {
377     return KEY_FACTORY_CLASS;
378   }
379 
380   /**
381    * Sets the key factory class.
382    * @param keyFactoryClass the keyFactoryClass to set.
383    */
384   static void setKeyFactoryClass(Class<?> keyFactoryClass) {
385     KEY_FACTORY_CLASS = keyFactoryClass;
386   }
387 
388   /**
389    * Gets the data store.
390    * @return DataStore
391    */
392   public static DataStore getDataStore() {
393     return DATA_STORE;
394   }
395 
396   /**
397    * Sets the data store
398    * @param dStore the dATA_STORE to set
399    */
400   public static void setDataStore(DataStore dStore) {
401     DATA_STORE = dStore;
402   }
403 
404   /**
405    * Returns a logger.
406    * @return the log for the output format.
407    */
408   public static Logger getLogger() {
409     return LOG;
410   }
411 }