This project has retired. For details please refer to its
Attic page.
HCatalogVertexOutputFormat xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.hcatalog;
20
21 import org.apache.giraph.graph.Vertex;
22 import org.apache.giraph.io.VertexOutputFormat;
23 import org.apache.giraph.io.VertexWriter;
24 import org.apache.hadoop.io.Writable;
25 import org.apache.hadoop.io.WritableComparable;
26 import org.apache.hadoop.mapreduce.JobContext;
27 import org.apache.hadoop.mapreduce.OutputCommitter;
28 import org.apache.hadoop.mapreduce.RecordWriter;
29 import org.apache.hadoop.mapreduce.TaskAttemptContext;
30 import org.apache.hcatalog.data.DefaultHCatRecord;
31 import org.apache.hcatalog.data.HCatRecord;
32 import org.apache.hcatalog.mapreduce.HCatOutputFormat;
33
34 import java.io.IOException;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 @SuppressWarnings("rawtypes")
54 public abstract class HCatalogVertexOutputFormat<
55 I extends WritableComparable,
56 V extends Writable,
57 E extends Writable>
58 extends VertexOutputFormat<I, V, E> {
59
60
61
62 protected HCatOutputFormat hCatOutputFormat = new HCatOutputFormat();
63
64 @Override
65 public final void checkOutputSpecs(JobContext context) throws IOException,
66 InterruptedException {
67 hCatOutputFormat.checkOutputSpecs(context);
68 }
69
70 @Override
71 public final OutputCommitter getOutputCommitter(TaskAttemptContext context)
72 throws IOException, InterruptedException {
73 return hCatOutputFormat.getOutputCommitter(context);
74 }
75
76
77
78
79
80
81
82
83
84
85
86
87 protected abstract static class HCatalogVertexWriter<
88 I extends WritableComparable,
89 V extends Writable,
90 E extends Writable>
91 extends VertexWriter<I, V, E> {
92
93
94 private RecordWriter<WritableComparable<?>, HCatRecord> hCatRecordWriter;
95
96 private TaskAttemptContext context;
97
98
99
100
101
102
103 protected void initialize(
104 RecordWriter<WritableComparable<?>,
105 HCatRecord> hCatRecordWriter) {
106 this.hCatRecordWriter = hCatRecordWriter;
107 }
108
109
110
111
112
113 protected RecordWriter<WritableComparable<?>,
114 HCatRecord> getRecordWriter() {
115 return hCatRecordWriter;
116 }
117
118
119
120
121
122
123 protected TaskAttemptContext getContext() {
124 return context;
125 }
126
127 @Override
128 public void initialize(TaskAttemptContext context) throws IOException {
129 this.context = context;
130 }
131
132 @Override
133 public void close(TaskAttemptContext context) throws IOException,
134 InterruptedException {
135 hCatRecordWriter.close(context);
136 }
137
138 }
139
140
141
142
143
144 protected abstract HCatalogVertexWriter<I, V, E> createVertexWriter();
145
146 @Override
147 public final VertexWriter<I, V, E> createVertexWriter(
148 TaskAttemptContext context) throws IOException,
149 InterruptedException {
150 HCatalogVertexWriter<I, V, E> writer = createVertexWriter();
151 writer.initialize(hCatOutputFormat.getRecordWriter(context));
152 return writer;
153 }
154
155
156
157
158
159
160
161
162 protected abstract static class SingleRowHCatalogVertexWriter<
163 I extends WritableComparable,
164 V extends Writable,
165 E extends Writable>
166 extends HCatalogVertexWriter<I, V, E> {
167
168
169
170
171 protected abstract int getNumColumns();
172
173
174
175
176
177
178 protected abstract void fillRecord(HCatRecord record,
179 Vertex<I, V, E> vertex);
180
181
182
183
184
185
186 protected HCatRecord createRecord(Vertex<I, V, E> vertex) {
187 HCatRecord record = new DefaultHCatRecord(getNumColumns());
188 fillRecord(record, vertex);
189 return record;
190 }
191
192 @Override
193 public final void writeVertex(Vertex<I, V, E> vertex) throws IOException,
194 InterruptedException {
195 getRecordWriter().write(null, createRecord(vertex));
196 }
197
198 }
199
200
201
202
203
204
205
206
207 public abstract static class MultiRowHCatalogVertexWriter<
208 I extends WritableComparable,
209 V extends Writable,
210 E extends Writable>
211 extends HCatalogVertexWriter<I, V, E> {
212
213
214
215
216
217 protected abstract Iterable<HCatRecord> createRecords(
218 Vertex<I, V, E> vertex);
219
220 @Override
221 public final void writeVertex(Vertex<I, V, E> vertex) throws IOException,
222 InterruptedException {
223 Iterable<HCatRecord> records = createRecords(vertex);
224 for (HCatRecord record : records) {
225 getRecordWriter().write(null, record);
226 }
227 }
228 }
229 }