This project has retired. For details please refer to its
Attic page.
WrappedEdgeOutputFormat xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.internal;
20
21 import java.io.IOException;
22
23 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24 import org.apache.giraph.edge.Edge;
25 import org.apache.giraph.io.EdgeOutputFormat;
26 import org.apache.giraph.io.EdgeWriter;
27 import org.apache.giraph.job.HadoopUtils;
28 import org.apache.hadoop.io.Writable;
29 import org.apache.hadoop.io.WritableComparable;
30 import org.apache.hadoop.mapreduce.JobContext;
31 import org.apache.hadoop.mapreduce.JobStatus;
32 import org.apache.hadoop.mapreduce.OutputCommitter;
33 import org.apache.hadoop.mapreduce.TaskAttemptContext;
34
35
36
37
38
39
40
41
42
43
44
45
46
47 @SuppressWarnings("rawtypes")
48 public class WrappedEdgeOutputFormat<I extends WritableComparable,
49 V extends Writable, E extends Writable>
50 extends EdgeOutputFormat<I, V, E> {
51
52
53 private final EdgeOutputFormat<I, V, E> originalOutputFormat;
54
55
56
57
58
59
60 public WrappedEdgeOutputFormat(
61 EdgeOutputFormat<I, V, E> edgeOutputFormat) {
62 originalOutputFormat = edgeOutputFormat;
63 }
64
65 @Override
66 public EdgeWriter<I, V, E> createEdgeWriter(
67 TaskAttemptContext context) throws IOException, InterruptedException {
68 final EdgeWriter<I, V, E> edgeWriter =
69 originalOutputFormat.createEdgeWriter(
70 HadoopUtils.makeTaskAttemptContext(getConf(), context));
71 return new EdgeWriter<I, V, E>() {
72 @Override
73 public void setConf(ImmutableClassesGiraphConfiguration<I, V, E> conf) {
74 super.setConf(conf);
75 edgeWriter.setConf(conf);
76 }
77
78 @Override
79 public void initialize(TaskAttemptContext context)
80 throws IOException, InterruptedException {
81 edgeWriter.initialize(
82 HadoopUtils.makeTaskAttemptContext(getConf(), context));
83 }
84
85 @Override
86 public void close(
87 TaskAttemptContext context) throws IOException, InterruptedException {
88 edgeWriter.close(
89 HadoopUtils.makeTaskAttemptContext(getConf(), context));
90 }
91
92 @Override
93 public void writeEdge(I sourceId, V sourceValue, Edge<I, E> edge)
94 throws IOException, InterruptedException {
95 edgeWriter.writeEdge(sourceId, sourceValue, edge);
96 }
97 };
98 }
99
100 @Override
101 public void checkOutputSpecs(JobContext context)
102 throws IOException, InterruptedException {
103 originalOutputFormat.checkOutputSpecs(
104 HadoopUtils.makeJobContext(getConf(), context));
105 }
106
107 @Override
108 public OutputCommitter getOutputCommitter(TaskAttemptContext context)
109 throws IOException, InterruptedException {
110
111 final OutputCommitter outputCommitter =
112 originalOutputFormat.getOutputCommitter(
113 HadoopUtils.makeTaskAttemptContext(getConf(), context));
114
115 return new OutputCommitter() {
116 @Override
117 public void setupJob(JobContext context) throws IOException {
118 outputCommitter.setupJob(
119 HadoopUtils.makeJobContext(getConf(), context));
120 }
121
122 @Override
123 public void setupTask(TaskAttemptContext context) throws IOException {
124 outputCommitter.setupTask(
125 HadoopUtils.makeTaskAttemptContext(getConf(), context));
126 }
127
128 @Override
129 public boolean needsTaskCommit(
130 TaskAttemptContext context) throws IOException {
131 return outputCommitter.needsTaskCommit(
132 HadoopUtils.makeTaskAttemptContext(getConf(), context));
133 }
134
135 @Override
136 public void commitTask(TaskAttemptContext context) throws IOException {
137 outputCommitter.commitTask(
138 HadoopUtils.makeTaskAttemptContext(getConf(), context));
139 }
140
141 @Override
142 public void abortTask(TaskAttemptContext context) throws IOException {
143 outputCommitter.abortTask(
144 HadoopUtils.makeTaskAttemptContext(getConf(), context));
145 }
146
147 @Override
148 public void cleanupJob(JobContext context) throws IOException {
149 outputCommitter.cleanupJob(
150 HadoopUtils.makeJobContext(getConf(), context));
151 }
152
153 @Override
154 public void commitJob(JobContext context) throws IOException {
155 outputCommitter.commitJob(
156 HadoopUtils.makeJobContext(getConf(), context));
157 }
158
159 @Override
160 public void abortJob(JobContext context,
161 JobStatus.State state) throws IOException {
162 outputCommitter.abortJob(
163 HadoopUtils.makeJobContext(getConf(), context), state);
164 }
165 };
166 }
167
168 @Override
169 public void preWriting(TaskAttemptContext context) {
170 originalOutputFormat.preWriting(context);
171 }
172
173 @Override
174 public void postWriting(TaskAttemptContext context) {
175 originalOutputFormat.postWriting(context);
176 }
177 }