This project has retired. For details please refer to its
Attic page.
GoraEdgeOutputFormat xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.io.gora;
19
20 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_DATASTORE_CLASS;
21 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_KEY_CLASS;
22 import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS;
23
24 import java.io.IOException;
25
26 import org.apache.giraph.edge.Edge;
27 import org.apache.giraph.io.EdgeOutputFormat;
28 import org.apache.giraph.io.EdgeWriter;
29 import org.apache.giraph.io.gora.utils.GoraUtils;
30 import org.apache.gora.persistency.Persistent;
31 import org.apache.gora.store.DataStore;
32 import org.apache.gora.util.GoraException;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.io.Writable;
35 import org.apache.hadoop.io.WritableComparable;
36 import org.apache.hadoop.mapreduce.JobContext;
37 import org.apache.hadoop.mapreduce.OutputCommitter;
38 import org.apache.hadoop.mapreduce.TaskAttemptContext;
39 import org.apache.log4j.Logger;
40
41
42
43
44
45
46
47
48
49
50
51
52
53 public abstract class GoraEdgeOutputFormat<I extends WritableComparable,
54 V extends Writable, E extends Writable>
55 extends EdgeOutputFormat<I, V, E> {
56
57
58 private static final Logger LOG =
59 Logger.getLogger(GoraEdgeOutputFormat.class);
60
61
62 private static Class<?> KEY_CLASS;
63
64
65 private static Class<? extends Persistent> PERSISTENT_CLASS;
66
67
68 private static Class<? extends DataStore> DATASTORE_CLASS;
69
70
71 private static DataStore DATA_STORE;
72
73
74
75
76
77
78
79
80 @Override
81 public void checkOutputSpecs(JobContext context)
82 throws IOException, InterruptedException {
83 }
84
85
86
87
88
89
90 public DataStore createDataStore(Configuration conf) {
91 DataStore dsCreated = null;
92 try {
93 dsCreated = GoraUtils.createSpecificDataStore(conf, getDatastoreClass(),
94 getKeyClass(), getPersistentClass());
95 } catch (GoraException e) {
96 getLogger().error("Error creating data store.");
97 e.printStackTrace();
98 }
99 return dsCreated;
100 }
101
102 @Override
103 public abstract GoraEdgeWriter
104 createEdgeWriter(TaskAttemptContext context)
105 throws IOException, InterruptedException;
106
107
108
109
110
111
112
113
114
115 @Override
116 public OutputCommitter getOutputCommitter(TaskAttemptContext context)
117 throws IOException, InterruptedException {
118 return new NullOutputCommitter();
119 }
120
121
122
123
124 private static class NullOutputCommitter extends OutputCommitter {
125 @Override
126 public void abortTask(TaskAttemptContext arg0) throws IOException { }
127
128 @Override
129 public void commitTask(TaskAttemptContext arg0) throws IOException { }
130
131 @Override
132 public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
133 return false;
134 }
135
136 @Override
137 public void setupJob(JobContext arg0) throws IOException { }
138
139 @Override
140 public void setupTask(TaskAttemptContext arg0) throws IOException { }
141 }
142
143
144
145
146
147 protected abstract class GoraEdgeWriter extends EdgeWriter<I, V, E> {
148 @Override
149 public void initialize(TaskAttemptContext context) throws IOException,
150 InterruptedException {
151 String sDataStoreType =
152 GIRAPH_GORA_OUTPUT_DATASTORE_CLASS.get(getConf());
153 String sKeyType =
154 GIRAPH_GORA_OUTPUT_KEY_CLASS.get(getConf());
155 String sPersistentType =
156 GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS.get(getConf());
157 try {
158 Class<?> keyClass = Class.forName(sKeyType);
159 Class<?> persistentClass = Class.forName(sPersistentType);
160 Class<?> dataStoreClass = Class.forName(sDataStoreType);
161 setKeyClass(keyClass);
162 setPersistentClass((Class<? extends Persistent>) persistentClass);
163 setDatastoreClass((Class<? extends DataStore>) dataStoreClass);
164 setDataStore(createDataStore(context.getConfiguration()));
165 if (getDataStore() != null) {
166 getLogger().debug("The data store has been created.");
167 }
168 } catch (ClassNotFoundException e) {
169 getLogger().error("Error while reading Gora Output parameters");
170 e.printStackTrace();
171 }
172 }
173
174 @Override
175 public void close(TaskAttemptContext context)
176 throws IOException, InterruptedException {
177 getDataStore().flush();
178 getDataStore().close();
179 }
180
181 @Override
182 public void writeEdge(I srcId, V srcValue, Edge<I, E> edge)
183 throws IOException, InterruptedException {
184 Persistent goraEdge = null;
185 Object goraKey = getGoraKey(srcId, srcValue, edge);
186 goraEdge = getGoraEdge(srcId, srcValue, edge);
187 getDataStore().put(goraKey, goraEdge);
188 }
189
190
191
192
193
194
195
196
197
198
199 protected abstract Persistent getGoraEdge
200 (I srcId, V srcValue, Edge<I, E> edge);
201
202
203
204
205
206
207
208
209 protected abstract Object getGoraKey(I srcId, V srcValue, Edge<I, E> edge);
210 }
211
212
213
214
215
216 public static DataStore getDataStore() {
217 return DATA_STORE;
218 }
219
220
221
222
223
224 public static void setDataStore(DataStore dStore) {
225 DATA_STORE = dStore;
226 }
227
228
229
230
231
232 static Class<? extends Persistent> getPersistentClass() {
233 return PERSISTENT_CLASS;
234 }
235
236
237
238
239
240 static void setPersistentClass
241 (Class<? extends Persistent> persistentClassUsed) {
242 PERSISTENT_CLASS = persistentClassUsed;
243 }
244
245
246
247
248
249 static Class<?> getKeyClass() {
250 return KEY_CLASS;
251 }
252
253
254
255
256
257 static void setKeyClass(Class<?> keyClassUsed) {
258 KEY_CLASS = keyClassUsed;
259 }
260
261
262
263
264 public static Class<? extends DataStore> getDatastoreClass() {
265 return DATASTORE_CLASS;
266 }
267
268
269
270
271 public static void setDatastoreClass(
272 Class<? extends DataStore> dataStoreClass) {
273 DATASTORE_CLASS = dataStoreClass;
274 }
275
276
277
278
279
280 public static Logger getLogger() {
281 return LOG;
282 }
283 }