This project has retired. For details please refer to its
Attic page.
WrappedVertexOutputFormat xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.internal;
20
21 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22 import org.apache.giraph.graph.Vertex;
23 import org.apache.giraph.io.VertexOutputFormat;
24 import org.apache.giraph.io.VertexWriter;
25 import org.apache.giraph.job.HadoopUtils;
26 import org.apache.hadoop.io.Writable;
27 import org.apache.hadoop.io.WritableComparable;
28 import org.apache.hadoop.mapreduce.JobContext;
29 import org.apache.hadoop.mapreduce.JobStatus;
30 import org.apache.hadoop.mapreduce.OutputCommitter;
31 import org.apache.hadoop.mapreduce.TaskAttemptContext;
32
33 import java.io.IOException;
34
35
36
37
38
39
40
41
42
43
44
45
46
47 public class WrappedVertexOutputFormat<I extends WritableComparable,
48 V extends Writable, E extends Writable>
49 extends VertexOutputFormat<I, V, E> {
50
51 private VertexOutputFormat<I, V, E> originalOutputFormat;
52
53
54
55
56
57
58 public WrappedVertexOutputFormat(
59 VertexOutputFormat<I, V, E> vertexOutputFormat) {
60 originalOutputFormat = vertexOutputFormat;
61 }
62
63 @Override
64 public VertexWriter<I, V, E> createVertexWriter(
65 TaskAttemptContext context) throws IOException, InterruptedException {
66 final VertexWriter<I, V, E> vertexWriter =
67 originalOutputFormat.createVertexWriter(
68 HadoopUtils.makeTaskAttemptContext(getConf(), context));
69 return new VertexWriter<I, V, E>() {
70 @Override
71 public void setConf(
72 ImmutableClassesGiraphConfiguration<I, V, E> conf) {
73 super.setConf(conf);
74 vertexWriter.setConf(conf);
75 }
76
77 @Override
78 public void initialize(
79 TaskAttemptContext context) throws IOException, InterruptedException {
80 vertexWriter.initialize(
81 HadoopUtils.makeTaskAttemptContext(getConf(), context));
82 }
83
84 @Override
85 public void close(
86 TaskAttemptContext context) throws IOException, InterruptedException {
87 vertexWriter.close(
88 HadoopUtils.makeTaskAttemptContext(getConf(), context));
89 }
90
91 @Override
92 public void writeVertex(
93 Vertex<I, V, E> vertex) throws IOException, InterruptedException {
94 vertexWriter.writeVertex(vertex);
95 }
96 };
97 }
98
99 @Override
100 public void checkOutputSpecs(
101 JobContext context) throws IOException, InterruptedException {
102 originalOutputFormat.checkOutputSpecs(
103 HadoopUtils.makeJobContext(getConf(), context));
104 }
105
106 @Override
107 public OutputCommitter getOutputCommitter(
108 TaskAttemptContext context) throws IOException, InterruptedException {
109 final OutputCommitter outputCommitter =
110 originalOutputFormat.getOutputCommitter(
111 HadoopUtils.makeTaskAttemptContext(getConf(), context));
112 return new OutputCommitter() {
113 @Override
114 public void setupJob(JobContext context) throws IOException {
115 outputCommitter.setupJob(
116 HadoopUtils.makeJobContext(getConf(), context));
117 }
118
119 @Override
120 public void setupTask(TaskAttemptContext context) throws IOException {
121 outputCommitter.setupTask(
122 HadoopUtils.makeTaskAttemptContext(getConf(), context));
123 }
124
125 @Override
126 public boolean needsTaskCommit(
127 TaskAttemptContext context) throws IOException {
128 return outputCommitter.needsTaskCommit(
129 HadoopUtils.makeTaskAttemptContext(getConf(), context));
130 }
131
132 @Override
133 public void commitTask(TaskAttemptContext context) throws IOException {
134 outputCommitter.commitTask(
135 HadoopUtils.makeTaskAttemptContext(getConf(), context));
136 }
137
138 @Override
139 public void abortTask(TaskAttemptContext context) throws IOException {
140 outputCommitter.abortTask(
141 HadoopUtils.makeTaskAttemptContext(getConf(), context));
142 }
143
144 @Override
145 public void cleanupJob(JobContext context) throws IOException {
146 outputCommitter.cleanupJob(
147 HadoopUtils.makeJobContext(getConf(), context));
148 }
149
150 @Override
151 public void commitJob(JobContext context) throws IOException {
152 outputCommitter.commitJob(
153 HadoopUtils.makeJobContext(getConf(), context));
154 }
155
156 @Override
157 public void abortJob(JobContext context,
158 JobStatus.State state) throws IOException {
159 outputCommitter.abortJob(
160 HadoopUtils.makeJobContext(getConf(), context), state);
161 }
162 };
163 }
164
165 @Override
166 public void preWriting(TaskAttemptContext context) {
167 originalOutputFormat.preWriting(context);
168 }
169
170 @Override
171 public void postWriting(TaskAttemptContext context) {
172 originalOutputFormat.postWriting(context);
173 }
174 }