This project has retired. For details please refer to its
Attic page.
MultiThreadedSuperstepOutput xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.superstep_output;
20
21 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22 import org.apache.giraph.io.SimpleVertexWriter;
23 import org.apache.giraph.io.VertexOutputFormat;
24 import org.apache.giraph.io.VertexWriter;
25 import org.apache.giraph.utils.CallableFactory;
26 import org.apache.giraph.utils.ProgressableUtils;
27 import org.apache.hadoop.io.Writable;
28 import org.apache.hadoop.io.WritableComparable;
29 import org.apache.hadoop.mapreduce.Mapper;
30
31 import com.google.common.collect.Lists;
32 import com.google.common.collect.Sets;
33
34 import java.io.IOException;
35 import java.util.List;
36 import java.util.Set;
37 import java.util.concurrent.Callable;
38
39
40
41
42
43
44
45
46
47 public class MultiThreadedSuperstepOutput<I extends WritableComparable,
48 V extends Writable, E extends Writable> implements
49 SuperstepOutput<I, V, E> {
50
51 private final Mapper<?, ?, ?, ?>.Context context;
52
53 private ImmutableClassesGiraphConfiguration<I, V, E> configuration;
54
55 private final VertexOutputFormat<I, V, E> vertexOutputFormat;
56
57
58
59
60 private final List<VertexWriter<I, V, E>> availableVertexWriters;
61
62 private final Set<VertexWriter<I, V, E>> occupiedVertexWriters;
63
64
65
66
67
68
69
70 public MultiThreadedSuperstepOutput(
71 ImmutableClassesGiraphConfiguration<I, V, E> conf,
72 Mapper<?, ?, ?, ?>.Context context) {
73 this.configuration = conf;
74 vertexOutputFormat = conf.createWrappedVertexOutputFormat();
75 this.context = context;
76 availableVertexWriters = Lists.newArrayList();
77 occupiedVertexWriters = Sets.newHashSet();
78 vertexOutputFormat.preWriting(context);
79 }
80
81 @Override
82 public synchronized SimpleVertexWriter<I, V, E> getVertexWriter() {
83 VertexWriter<I, V, E> vertexWriter;
84 if (availableVertexWriters.isEmpty()) {
85 try {
86 vertexWriter = vertexOutputFormat.createVertexWriter(context);
87 vertexWriter.setConf(configuration);
88 vertexWriter.initialize(context);
89 } catch (IOException e) {
90 throw new IllegalStateException("getVertexWriter: " +
91 "IOException occurred", e);
92 } catch (InterruptedException e) {
93 throw new IllegalStateException("getVertexWriter: " +
94 "InterruptedException occurred", e);
95 }
96 } else {
97 vertexWriter =
98 availableVertexWriters.remove(availableVertexWriters.size() - 1);
99 }
100 occupiedVertexWriters.add(vertexWriter);
101 return vertexWriter;
102 }
103
104 @Override
105 public synchronized void returnVertexWriter(
106 SimpleVertexWriter<I, V, E> vertexWriter) {
107 VertexWriter<I, V, E> returnedWriter = (VertexWriter<I, V, E>) vertexWriter;
108 if (!occupiedVertexWriters.remove(returnedWriter)) {
109 throw new IllegalStateException("returnVertexWriter: " +
110 "Returned vertex writer which is not currently occupied!");
111 }
112 availableVertexWriters.add(returnedWriter);
113 }
114
115 @Override
116 public synchronized void postApplication() throws IOException,
117 InterruptedException {
118 if (!occupiedVertexWriters.isEmpty()) {
119 throw new IllegalStateException("postApplication: " +
120 occupiedVertexWriters.size() +
121 " vertex writers were not returned!");
122 }
123
124
125 CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
126 @Override
127 public Callable<Void> newCallable(int callableId) {
128 return new Callable<Void>() {
129 @Override
130 public Void call() throws Exception {
131 while (true) {
132 VertexWriter<I, V, E> vertexWriter;
133 synchronized (availableVertexWriters) {
134 if (availableVertexWriters.isEmpty()) {
135 return null;
136 }
137 vertexWriter = availableVertexWriters.remove(
138 availableVertexWriters.size() - 1);
139 }
140 vertexWriter.close(context);
141 }
142 }
143 };
144 }
145 };
146 ProgressableUtils.getResultsWithNCallables(callableFactory,
147 Math.min(configuration.getNumOutputThreads(),
148 availableVertexWriters.size()), "close-writers-%d", context);
149 vertexOutputFormat.postWriting(context);
150 }
151 }