View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.io.gora;
19  
20  import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_DATASTORE_CLASS;
21  import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_KEY_CLASS;
22  import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS;
23  
24  import java.io.IOException;
25  
26  import org.apache.giraph.graph.Vertex;
27  import org.apache.giraph.io.VertexOutputFormat;
28  import org.apache.giraph.io.VertexWriter;
29  import org.apache.giraph.io.gora.utils.GoraUtils;
30  import org.apache.gora.persistency.Persistent;
31  import org.apache.gora.store.DataStore;
32  import org.apache.gora.util.GoraException;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.io.Writable;
35  import org.apache.hadoop.io.WritableComparable;
36  import org.apache.hadoop.mapreduce.JobContext;
37  import org.apache.hadoop.mapreduce.OutputCommitter;
38  import org.apache.hadoop.mapreduce.TaskAttemptContext;
39  import org.apache.log4j.Logger;
40  import org.apache.zookeeper.WatchedEvent;
41  import org.apache.zookeeper.Watcher;
42  import org.apache.zookeeper.Watcher.Event.EventType;
43  /**
44   *
45   *  Class which wraps the GoraOutputFormat. It's designed
46   *  as an extension point to VertexOutputFormat subclasses who wish
47   *  to write vertices back to an Accumulo table.
48   *
49   *  Works with
50   *  {@link GoraVertexInputFormat}
51   *
52   *
53   * @param <I> vertex id type
54   * @param <V>  vertex value type
55   * @param <E>  edge type
56   */
57  public abstract class GoraVertexOutputFormat<
58          I extends WritableComparable,
59          V extends Writable,
60          E extends Writable>
61          extends VertexOutputFormat<I, V, E> {
62  
63    /** Logger for Gora's vertex input format. */
64    private static final Logger LOG =
65          Logger.getLogger(GoraVertexOutputFormat.class);
66  
67    /** KeyClass used for getting data. */
68    private static Class<?> KEY_CLASS;
69  
70    /** The vertex itself will be used as a value inside Gora. */
71    private static Class<? extends Persistent> PERSISTENT_CLASS;
72  
73    /** Data store class to be used as backend. */
74    private static Class<? extends DataStore> DATASTORE_CLASS;
75  
76    /** Data store used for querying data. */
77    private static DataStore DATA_STORE;
78  
79    /**
80     * checkOutputSpecs
81     *
82     * @param context information about the job
83     * @throws IOException
84     * @throws InterruptedException
85     */
86    @Override
87    public void checkOutputSpecs(JobContext context)
88      throws IOException, InterruptedException {
89    }
90  
91    /**
92     * Gets the data store object initialized.
93     * @param conf Configuration.
94     * @return DataStore created
95     */
96    public DataStore createDataStore(Configuration conf) {
97      DataStore dsCreated = null;
98      try {
99        dsCreated = GoraUtils.createSpecificDataStore(conf, getDatastoreClass(),
100           getKeyClass(), getPersistentClass());
101     } catch (GoraException e) {
102       getLogger().error("Error creating data store.");
103       e.printStackTrace();
104     }
105     return dsCreated;
106   }
107 
108   /**
109    * getOutputCommitter
110    *
111    * @param context the task context
112    * @return OutputCommitter
113    * @throws IOException
114    * @throws InterruptedException
115    */
116   @Override
117   public OutputCommitter getOutputCommitter(TaskAttemptContext context)
118     throws IOException, InterruptedException {
119     return new NullOutputCommitter();
120   }
121 
122   /**
123    * Empty output commiter for hadoop.
124    */
125   private static class NullOutputCommitter extends OutputCommitter {
126     @Override
127     public void abortTask(TaskAttemptContext arg0) throws IOException {    }
128 
129     @Override
130     public void commitTask(TaskAttemptContext arg0) throws IOException {    }
131 
132     @Override
133     public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
134       return false;
135     }
136 
137     @Override
138     public void setupJob(JobContext arg0) throws IOException {    }
139 
140     @Override
141     public void setupTask(TaskAttemptContext arg0) throws IOException {    }
142   }
143 
144   /**
145    * Abstract class to be implemented by the user based on their specific
146    * vertex/edges output. Easiest to ignore the key value separator and only
147    * use key instead.
148    */
149   protected abstract class GoraVertexWriter
150     extends VertexWriter<I, V, E>
151     implements Watcher {
152     /** lock for management of the barrier */
153     private final Object lock = new Object();
154 
155     @Override
156     public void initialize(TaskAttemptContext context)
157       throws IOException, InterruptedException {
158       String sDataStoreType =
159         GIRAPH_GORA_OUTPUT_DATASTORE_CLASS.get(getConf());
160       String sKeyType =
161         GIRAPH_GORA_OUTPUT_KEY_CLASS.get(getConf());
162       String sPersistentType =
163         GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS.get(getConf());
164       try {
165         Class<?> keyClass = Class.forName(sKeyType);
166         Class<?> persistentClass = Class.forName(sPersistentType);
167         Class<?> dataStoreClass = Class.forName(sDataStoreType);
168         setKeyClass(keyClass);
169         setPersistentClass((Class<? extends Persistent>) persistentClass);
170         setDatastoreClass((Class<? extends DataStore>) dataStoreClass);
171         setDataStore(createDataStore(context.getConfiguration()));
172         if (getDataStore() != null) {
173           getLogger().info("The output data store has been created.");
174         }
175       } catch (ClassNotFoundException e) {
176         getLogger().error("Error while reading Gora Output parameters");
177         e.printStackTrace();
178       }
179     }
180 
181     @Override
182     public void close(TaskAttemptContext context)
183       throws IOException, InterruptedException {
184       getDataStore().flush();
185       getDataStore().close();
186     }
187 
188     @Override
189     public void writeVertex(Vertex<I, V, E> vertex)
190       throws IOException, InterruptedException {
191       Persistent goraVertex = null;
192       Object goraKey = getGoraKey(vertex);
193       goraVertex = getGoraVertex(vertex);
194       getDataStore().put(goraKey, goraVertex);
195     }
196 
197     @Override
198     public void process(WatchedEvent event) {
199       EventType type = event.getType();
200       if (type == EventType.NodeChildrenChanged) {
201         if (getLogger().isDebugEnabled()) {
202           getLogger().debug("signal: number of children changed.");
203         }
204         synchronized (lock) {
205           lock.notify();
206         }
207       }
208     }
209 
210     /**
211      * Each vertex needs to be transformed into a Gora object to be sent to
212      * a specific data store.
213      *
214      * @param  vertex   vertex to be transformed into a Gora object
215      * @return          Gora representation of the vertex
216      */
217     protected abstract Persistent getGoraVertex(Vertex<I, V, E> vertex);
218 
219     /**
220      * Gets the correct key from a computed vertex.
221      * @param vertex  vertex to extract the key from.
222      * @return        The key representing such vertex
223      */
224     protected abstract Object getGoraKey(Vertex<I, V, E> vertex);
225 
226   }
227 
228   /**
229    * Gets the data store.
230    * @return DataStore
231    */
232   public static DataStore getDataStore() {
233     return DATA_STORE;
234   }
235 
236   /**
237    * Sets the data store
238    * @param dStore the dATA_STORE to set
239    */
240   public static void setDataStore(DataStore dStore) {
241     DATA_STORE = dStore;
242   }
243 
244   /**
245    * Gets the persistent Class
246    * @return persistentClass used
247    */
248   static Class<? extends Persistent> getPersistentClass() {
249     return PERSISTENT_CLASS;
250   }
251 
252   /**
253    * Sets the persistent Class
254    * @param persistentClassUsed to be set
255    */
256   static void setPersistentClass
257   (Class<? extends Persistent> persistentClassUsed) {
258     PERSISTENT_CLASS = persistentClassUsed;
259   }
260 
261   /**
262    * Gets the key class used.
263    * @return the key class used.
264    */
265   static Class<?> getKeyClass() {
266     return KEY_CLASS;
267   }
268 
269   /**
270    * Sets the key class used.
271    * @param keyClassUsed key class used.
272    */
273   static void setKeyClass(Class<?> keyClassUsed) {
274     KEY_CLASS = keyClassUsed;
275   }
276 
277   /**
278    * @return Class the DATASTORE_CLASS
279    */
280   public static Class<? extends DataStore> getDatastoreClass() {
281     return DATASTORE_CLASS;
282   }
283 
284   /**
285    * @param dataStoreClass the dataStore class to set
286    */
287   public static void setDatastoreClass(
288       Class<? extends DataStore> dataStoreClass) {
289     DATASTORE_CLASS = dataStoreClass;
290   }
291 
292   /**
293    * Returns a logger.
294    * @return the log for the output format.
295    */
296   public static Logger getLogger() {
297     return LOG;
298   }
299 }