This project has retired. For details please refer to its
Attic page.
GoraVertexOutputFormat xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.io.gora;
19
20 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_DATASTORE_CLASS;
21 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_KEY_CLASS;
22 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS;
23
24 import java.io.IOException;
25
26 import org.apache.giraph.graph.Vertex;
27 import org.apache.giraph.io.VertexOutputFormat;
28 import org.apache.giraph.io.VertexWriter;
29 import org.apache.giraph.io.gora.utils.GoraUtils;
30 import org.apache.gora.persistency.Persistent;
31 import org.apache.gora.store.DataStore;
32 import org.apache.gora.util.GoraException;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.io.Writable;
35 import org.apache.hadoop.io.WritableComparable;
36 import org.apache.hadoop.mapreduce.JobContext;
37 import org.apache.hadoop.mapreduce.OutputCommitter;
38 import org.apache.hadoop.mapreduce.TaskAttemptContext;
39 import org.apache.log4j.Logger;
40 import org.apache.zookeeper.WatchedEvent;
41 import org.apache.zookeeper.Watcher;
42 import org.apache.zookeeper.Watcher.Event.EventType;
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 public abstract class GoraVertexOutputFormat<
58 I extends WritableComparable,
59 V extends Writable,
60 E extends Writable>
61 extends VertexOutputFormat<I, V, E> {
62
63
64 private static final Logger LOG =
65 Logger.getLogger(GoraVertexOutputFormat.class);
66
67
68 private static Class<?> KEY_CLASS;
69
70
71 private static Class<? extends Persistent> PERSISTENT_CLASS;
72
73
74 private static Class<? extends DataStore> DATASTORE_CLASS;
75
76
77 private static DataStore DATA_STORE;
78
79
80
81
82
83
84
85
86 @Override
87 public void checkOutputSpecs(JobContext context)
88 throws IOException, InterruptedException {
89 }
90
91
92
93
94
95
96 public DataStore createDataStore(Configuration conf) {
97 DataStore dsCreated = null;
98 try {
99 dsCreated = GoraUtils.createSpecificDataStore(conf, getDatastoreClass(),
100 getKeyClass(), getPersistentClass());
101 } catch (GoraException e) {
102 getLogger().error("Error creating data store.");
103 e.printStackTrace();
104 }
105 return dsCreated;
106 }
107
108
109
110
111
112
113
114
115
116 @Override
117 public OutputCommitter getOutputCommitter(TaskAttemptContext context)
118 throws IOException, InterruptedException {
119 return new NullOutputCommitter();
120 }
121
122
123
124
125 private static class NullOutputCommitter extends OutputCommitter {
126 @Override
127 public void abortTask(TaskAttemptContext arg0) throws IOException { }
128
129 @Override
130 public void commitTask(TaskAttemptContext arg0) throws IOException { }
131
132 @Override
133 public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
134 return false;
135 }
136
137 @Override
138 public void setupJob(JobContext arg0) throws IOException { }
139
140 @Override
141 public void setupTask(TaskAttemptContext arg0) throws IOException { }
142 }
143
144
145
146
147
148
149 protected abstract class GoraVertexWriter
150 extends VertexWriter<I, V, E>
151 implements Watcher {
152
153 private final Object lock = new Object();
154
155 @Override
156 public void initialize(TaskAttemptContext context)
157 throws IOException, InterruptedException {
158 String sDataStoreType =
159 GIRAPH_GORA_OUTPUT_DATASTORE_CLASS.get(getConf());
160 String sKeyType =
161 GIRAPH_GORA_OUTPUT_KEY_CLASS.get(getConf());
162 String sPersistentType =
163 GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS.get(getConf());
164 try {
165 Class<?> keyClass = Class.forName(sKeyType);
166 Class<?> persistentClass = Class.forName(sPersistentType);
167 Class<?> dataStoreClass = Class.forName(sDataStoreType);
168 setKeyClass(keyClass);
169 setPersistentClass((Class<? extends Persistent>) persistentClass);
170 setDatastoreClass((Class<? extends DataStore>) dataStoreClass);
171 setDataStore(createDataStore(context.getConfiguration()));
172 if (getDataStore() != null) {
173 getLogger().info("The output data store has been created.");
174 }
175 } catch (ClassNotFoundException e) {
176 getLogger().error("Error while reading Gora Output parameters");
177 e.printStackTrace();
178 }
179 }
180
181 @Override
182 public void close(TaskAttemptContext context)
183 throws IOException, InterruptedException {
184 getDataStore().flush();
185 getDataStore().close();
186 }
187
188 @Override
189 public void writeVertex(Vertex<I, V, E> vertex)
190 throws IOException, InterruptedException {
191 Persistent goraVertex = null;
192 Object goraKey = getGoraKey(vertex);
193 goraVertex = getGoraVertex(vertex);
194 getDataStore().put(goraKey, goraVertex);
195 }
196
197 @Override
198 public void process(WatchedEvent event) {
199 EventType type = event.getType();
200 if (type == EventType.NodeChildrenChanged) {
201 if (getLogger().isDebugEnabled()) {
202 getLogger().debug("signal: number of children changed.");
203 }
204 synchronized (lock) {
205 lock.notify();
206 }
207 }
208 }
209
210
211
212
213
214
215
216
217 protected abstract Persistent getGoraVertex(Vertex<I, V, E> vertex);
218
219
220
221
222
223
224 protected abstract Object getGoraKey(Vertex<I, V, E> vertex);
225
226 }
227
228
229
230
231
232 public static DataStore getDataStore() {
233 return DATA_STORE;
234 }
235
236
237
238
239
240 public static void setDataStore(DataStore dStore) {
241 DATA_STORE = dStore;
242 }
243
244
245
246
247
248 static Class<? extends Persistent> getPersistentClass() {
249 return PERSISTENT_CLASS;
250 }
251
252
253
254
255
256 static void setPersistentClass
257 (Class<? extends Persistent> persistentClassUsed) {
258 PERSISTENT_CLASS = persistentClassUsed;
259 }
260
261
262
263
264
265 static Class<?> getKeyClass() {
266 return KEY_CLASS;
267 }
268
269
270
271
272
273 static void setKeyClass(Class<?> keyClassUsed) {
274 KEY_CLASS = keyClassUsed;
275 }
276
277
278
279
280 public static Class<? extends DataStore> getDatastoreClass() {
281 return DATASTORE_CLASS;
282 }
283
284
285
286
287 public static void setDatastoreClass(
288 Class<? extends DataStore> dataStoreClass) {
289 DATASTORE_CLASS = dataStoreClass;
290 }
291
292
293
294
295
296 public static Logger getLogger() {
297 return LOG;
298 }
299 }