View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.io.gora;
19  
20  import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_DATASTORE_CLASS;
21  import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_KEY_CLASS;
22  import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS;
23  
24  import java.io.IOException;
25  
26  import org.apache.giraph.edge.Edge;
27  import org.apache.giraph.io.EdgeOutputFormat;
28  import org.apache.giraph.io.EdgeWriter;
29  import org.apache.giraph.io.gora.utils.GoraUtils;
30  import org.apache.gora.persistency.Persistent;
31  import org.apache.gora.store.DataStore;
32  import org.apache.gora.util.GoraException;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.io.Writable;
35  import org.apache.hadoop.io.WritableComparable;
36  import org.apache.hadoop.mapreduce.JobContext;
37  import org.apache.hadoop.mapreduce.OutputCommitter;
38  import org.apache.hadoop.mapreduce.TaskAttemptContext;
39  import org.apache.log4j.Logger;
40  
41  /**
42   *  Class which wraps the GoraInputFormat. It's designed
43   *  as an extension point to EdgeOutputFormat subclasses who wish
44   *  to write to Gora data sources.
45   *
46   *  Works with
47   *  {@link GoraEdgeInputFormat}
48   *
49   * @param <I> edge id type
50   * @param <V>  vertex type
51   * @param <E>  edge type
52   */
53  public abstract class GoraEdgeOutputFormat<I extends WritableComparable,
54    V extends Writable, E extends Writable>
55    extends EdgeOutputFormat<I, V, E> {
56  
57    /** Logger for Gora's vertex input format. */
58    private static final Logger LOG =
59            Logger.getLogger(GoraEdgeOutputFormat.class);
60  
61    /** KeyClass used for getting data. */
62    private static Class<?> KEY_CLASS;
63  
64    /** The vertex itself will be used as a value inside Gora. */
65    private static Class<? extends Persistent> PERSISTENT_CLASS;
66  
67    /** Data store class to be used as backend. */
68    private static Class<? extends DataStore> DATASTORE_CLASS;
69  
70    /** Data store used for querying data. */
71    private static DataStore DATA_STORE;
72  
73    /**
74     * checkOutputSpecs
75     *
76     * @param context information about the job
77     * @throws IOException
78     * @throws InterruptedException
79     */
80    @Override
81    public void checkOutputSpecs(JobContext context)
82      throws IOException, InterruptedException {
83    }
84  
85    /**
86     * Gets the data store object initialized.
87     * @param conf Configuration
88     * @return DataStore created
89     */
90    public DataStore createDataStore(Configuration conf) {
91      DataStore dsCreated = null;
92      try {
93        dsCreated = GoraUtils.createSpecificDataStore(conf, getDatastoreClass(),
94            getKeyClass(), getPersistentClass());
95      } catch (GoraException e) {
96        getLogger().error("Error creating data store.");
97        e.printStackTrace();
98      }
99      return dsCreated;
100   }
101 
102   @Override
103   public abstract GoraEdgeWriter
104   createEdgeWriter(TaskAttemptContext context)
105     throws IOException, InterruptedException;
106 
107   /**
108    * getOutputCommitter
109    *
110    * @param context the task context
111    * @return OutputCommitter
112    * @throws IOException
113    * @throws InterruptedException
114    */
115   @Override
116   public OutputCommitter getOutputCommitter(TaskAttemptContext context)
117     throws IOException, InterruptedException {
118     return new NullOutputCommitter();
119   }
120 
121   /**
122    * Empty output commiter for hadoop.
123    */
124   private static class NullOutputCommitter extends OutputCommitter {
125     @Override
126     public void abortTask(TaskAttemptContext arg0) throws IOException {    }
127 
128     @Override
129     public void commitTask(TaskAttemptContext arg0) throws IOException {    }
130 
131     @Override
132     public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
133       return false;
134     }
135 
136     @Override
137     public void setupJob(JobContext arg0) throws IOException {    }
138 
139     @Override
140     public void setupTask(TaskAttemptContext arg0) throws IOException {    }
141   }
142 
143   /**
144    * Abstract class to be implemented by the user based on their specific
145    * vertex/edges output.
146    */
147   protected abstract class GoraEdgeWriter extends EdgeWriter<I, V, E> {
148     @Override
149     public void initialize(TaskAttemptContext context) throws IOException,
150       InterruptedException {
151       String sDataStoreType =
152           GIRAPH_GORA_OUTPUT_DATASTORE_CLASS.get(getConf());
153       String sKeyType =
154           GIRAPH_GORA_OUTPUT_KEY_CLASS.get(getConf());
155       String sPersistentType =
156           GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS.get(getConf());
157       try {
158         Class<?> keyClass = Class.forName(sKeyType);
159         Class<?> persistentClass = Class.forName(sPersistentType);
160         Class<?> dataStoreClass = Class.forName(sDataStoreType);
161         setKeyClass(keyClass);
162         setPersistentClass((Class<? extends Persistent>) persistentClass);
163         setDatastoreClass((Class<? extends DataStore>) dataStoreClass);
164         setDataStore(createDataStore(context.getConfiguration()));
165         if (getDataStore() != null) {
166           getLogger().debug("The data store has been created.");
167         }
168       } catch (ClassNotFoundException e) {
169         getLogger().error("Error while reading Gora Output parameters");
170         e.printStackTrace();
171       }
172     }
173 
174     @Override
175     public void close(TaskAttemptContext context)
176       throws IOException, InterruptedException {
177       getDataStore().flush();
178       getDataStore().close();
179     }
180 
181     @Override
182     public void writeEdge(I srcId, V srcValue, Edge<I, E> edge)
183       throws IOException, InterruptedException {
184       Persistent goraEdge = null;
185       Object goraKey = getGoraKey(srcId, srcValue, edge);
186       goraEdge = getGoraEdge(srcId, srcValue, edge);
187       getDataStore().put(goraKey, goraEdge);
188     }
189 
190     /**
191      * Each edge needs to be transformed into a Gora object to be sent to
192      * a specific data store.
193      *
194      * @param  edge   edge to be transformed into a Gora object
195      * @param  srcId  source vertex id
196      * @param  srcValue  source vertex value
197      * @return          Gora representation of the vertex
198      */
199     protected abstract Persistent getGoraEdge
200       (I srcId, V srcValue, Edge<I, E> edge);
201 
202     /**
203      * Gets the correct key from a computed vertex.
204      * @param edge  edge to extract the key from.
205      * @param  srcId  source vertex id
206      * @param  srcValue  source vertex value
207      * @return      The key representing such edge.
208      */
209     protected abstract Object getGoraKey(I srcId, V srcValue, Edge<I, E> edge);
210   }
211 
212   /**
213    * Gets the data store.
214    * @return DataStore
215    */
216   public static DataStore getDataStore() {
217     return DATA_STORE;
218   }
219 
220   /**
221    * Sets the data store
222    * @param dStore the dATA_STORE to set
223    */
224   public static void setDataStore(DataStore dStore) {
225     DATA_STORE = dStore;
226   }
227 
228   /**
229    * Gets the persistent Class
230    * @return persistentClass used
231    */
232   static Class<? extends Persistent> getPersistentClass() {
233     return PERSISTENT_CLASS;
234   }
235 
236   /**
237    * Sets the persistent Class
238    * @param persistentClassUsed to be set
239    */
240   static void setPersistentClass
241   (Class<? extends Persistent> persistentClassUsed) {
242     PERSISTENT_CLASS = persistentClassUsed;
243   }
244 
245   /**
246    * Gets the key class used.
247    * @return the key class used.
248    */
249   static Class<?> getKeyClass() {
250     return KEY_CLASS;
251   }
252 
253   /**
254    * Sets the key class used.
255    * @param keyClassUsed key class used.
256    */
257   static void setKeyClass(Class<?> keyClassUsed) {
258     KEY_CLASS = keyClassUsed;
259   }
260 
261   /**
262    * @return Class the DATASTORE_CLASS
263    */
264   public static Class<? extends DataStore> getDatastoreClass() {
265     return DATASTORE_CLASS;
266   }
267 
268   /**
269    * @param dataStoreClass the dataStore class to set
270    */
271   public static void setDatastoreClass(
272       Class<? extends DataStore> dataStoreClass) {
273     DATASTORE_CLASS = dataStoreClass;
274   }
275 
276   /**
277    * Gets the logger for the class.
278    * @return the log of the class.
279    */
280   public static Logger getLogger() {
281     return LOG;
282   }
283 }