This project has retired. For details please refer to its
Attic page.
AbstractEdgeStore xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.edge;
20
21 import com.google.common.collect.MapMaker;
22 import org.apache.giraph.bsp.CentralizedServiceWorker;
23 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
24 import org.apache.giraph.conf.GiraphConstants;
25 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
26 import org.apache.giraph.graph.Vertex;
27 import org.apache.giraph.ooc.OutOfCoreEngine;
28 import org.apache.giraph.partition.Partition;
29 import org.apache.giraph.utils.CallableFactory;
30 import org.apache.giraph.utils.ProgressCounter;
31 import org.apache.giraph.utils.ProgressableUtils;
32 import org.apache.giraph.utils.ThreadLocalProgressCounter;
33 import org.apache.giraph.utils.Trimmable;
34 import org.apache.giraph.utils.VertexIdEdgeIterator;
35 import org.apache.giraph.utils.VertexIdEdges;
36 import org.apache.hadoop.io.Writable;
37 import org.apache.hadoop.io.WritableComparable;
38 import org.apache.hadoop.util.Progressable;
39 import org.apache.log4j.Logger;
40
41 import java.io.DataInput;
42 import java.io.DataOutput;
43 import java.io.IOException;
44 import java.util.Iterator;
45 import java.util.Map;
46 import java.util.concurrent.Callable;
47 import java.util.concurrent.ConcurrentMap;
48
49 import static com.google.common.base.Preconditions.checkState;
50
51
52
53
54
55
56
57
58
59
60
61 public abstract class AbstractEdgeStore<I extends WritableComparable,
62 V extends Writable, E extends Writable, K, Et>
63 extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
64 implements EdgeStore<I, V, E> {
65
66 public static final ThreadLocalProgressCounter PROGRESS_COUNTER =
67 new ThreadLocalProgressCounter();
68
69 private static final Logger LOG = Logger.getLogger(AbstractEdgeStore.class);
70
71 protected CentralizedServiceWorker<I, V, E> service;
72
73 protected ImmutableClassesGiraphConfiguration<I, V, E> configuration;
74
75 protected Progressable progressable;
76
77 protected ConcurrentMap<Integer, Map<K, OutEdges<I, E>>> transientEdges;
78
79
80
81
82 protected boolean reuseEdgeObjects;
83
84
85
86
87 protected boolean useInputOutEdges;
88
89 private volatile boolean hasEdgesOnDisk = false;
90
91 private CreateSourceVertexCallback<I> createSourceVertexCallback;
92
93
94
95
96
97
98
99
100
101 public AbstractEdgeStore(
102 CentralizedServiceWorker<I, V, E> service,
103 ImmutableClassesGiraphConfiguration<I, V, E> configuration,
104 Progressable progressable) {
105 this.service = service;
106 this.configuration = configuration;
107 this.progressable = progressable;
108 transientEdges = new MapMaker().concurrencyLevel(
109 configuration.getNettyServerExecutionConcurrency()).makeMap();
110 reuseEdgeObjects = configuration.reuseEdgeObjects();
111 useInputOutEdges = configuration.useInputOutEdges();
112 createSourceVertexCallback =
113 GiraphConstants.CREATE_EDGE_SOURCE_VERTICES_CALLBACK
114 .newInstance(configuration);
115 }
116
117
118
119
120
121
122
123
124 protected abstract I getVertexId(Et entry, I representativeVertexId);
125
126
127
128
129
130
131
132 protected abstract I createVertexId(Et entry);
133
134
135
136
137
138
139
140 protected abstract Map<K, OutEdges<I, E>> getPartitionEdges(int partitionId);
141
142
143
144
145
146
147
148 protected abstract OutEdges<I, E> getPartitionEdges(Et entry);
149
150
151
152
153
154
155
156 protected abstract void writeVertexKey(K key, DataOutput output)
157 throws IOException;
158
159
160
161
162
163
164
165 protected abstract K readVertexKey(DataInput input) throws IOException;
166
167
168
169
170
171
172
173 protected abstract Iterator<Et>
174 getPartitionEdgesIterator(Map<K, OutEdges<I, E>> partitionEdges);
175
176 @Override
177 public boolean hasEdgesForPartition(int partitionId) {
178 return transientEdges.containsKey(partitionId);
179 }
180
181 @Override
182 public void writePartitionEdgeStore(int partitionId, DataOutput output)
183 throws IOException {
184 Map<K, OutEdges<I, E>> edges = transientEdges.remove(partitionId);
185 if (edges != null) {
186 output.writeInt(edges.size());
187 if (edges.size() > 0) {
188 hasEdgesOnDisk = true;
189 }
190 for (Map.Entry<K, OutEdges<I, E>> edge : edges.entrySet()) {
191 writeVertexKey(edge.getKey(), output);
192 edge.getValue().write(output);
193 }
194 }
195 }
196
197 @Override
198 public void readPartitionEdgeStore(int partitionId, DataInput input)
199 throws IOException {
200 checkState(!transientEdges.containsKey(partitionId),
201 "readPartitionEdgeStore: reading a partition that is already there in" +
202 " the partition store (impossible)");
203 Map<K, OutEdges<I, E>> partitionEdges = getPartitionEdges(partitionId);
204 int numEntries = input.readInt();
205 for (int i = 0; i < numEntries; ++i) {
206 K vertexKey = readVertexKey(input);
207 OutEdges<I, E> edges = configuration.createAndInitializeInputOutEdges();
208 edges.readFields(input);
209 partitionEdges.put(vertexKey, edges);
210 }
211 }
212
213
214
215
216
217
218
219
220 protected abstract OutEdges<I, E> getVertexOutEdges(
221 VertexIdEdgeIterator<I, E> vertexIdEdgeIterator,
222 Map<K, OutEdges<I, E>> partitionEdgesIn);
223
224 @Override
225 public void addPartitionEdges(
226 int partitionId, VertexIdEdges<I, E> edges) {
227 Map<K, OutEdges<I, E>> partitionEdges = getPartitionEdges(partitionId);
228
229 VertexIdEdgeIterator<I, E> vertexIdEdgeIterator =
230 edges.getVertexIdEdgeIterator();
231 while (vertexIdEdgeIterator.hasNext()) {
232 vertexIdEdgeIterator.next();
233 Edge<I, E> edge = reuseEdgeObjects ?
234 vertexIdEdgeIterator.getCurrentEdge() :
235 vertexIdEdgeIterator.releaseCurrentEdge();
236 OutEdges<I, E> outEdges = getVertexOutEdges(vertexIdEdgeIterator,
237 partitionEdges);
238 synchronized (outEdges) {
239 outEdges.add(edge);
240 }
241 }
242 }
243
244
245
246
247
248
249
250
251 private OutEdges<I, E> convertInputToComputeEdges(
252 OutEdges<I, E> inputEdges) {
253 if (!useInputOutEdges) {
254 return inputEdges;
255 } else {
256 return configuration.createAndInitializeOutEdges(inputEdges);
257 }
258 }
259
260 @Override
261 public void moveEdgesToVertices() {
262 if (transientEdges.isEmpty() && !hasEdgesOnDisk) {
263 if (LOG.isInfoEnabled()) {
264 LOG.info("moveEdgesToVertices: No edges to move");
265 }
266 return;
267 }
268
269 if (LOG.isInfoEnabled()) {
270 LOG.info("moveEdgesToVertices: Moving incoming edges to " +
271 "vertices. Using " + createSourceVertexCallback);
272 }
273
274 service.getPartitionStore().startIteration();
275 int numThreads = configuration.getNumInputSplitsThreads();
276
277 CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
278 @Override
279 public Callable<Void> newCallable(int callableId) {
280 return new Callable<Void>() {
281 @Override
282 public Void call() throws Exception {
283 I representativeVertexId = configuration.createVertexId();
284 OutOfCoreEngine oocEngine = service.getServerData().getOocEngine();
285 if (oocEngine != null) {
286 oocEngine.processingThreadStart();
287 }
288 ProgressCounter numVerticesProcessed = PROGRESS_COUNTER.get();
289 while (true) {
290 Partition<I, V, E> partition =
291 service.getPartitionStore().getNextPartition();
292 if (partition == null) {
293 break;
294 }
295 Map<K, OutEdges<I, E>> partitionEdges =
296 transientEdges.remove(partition.getId());
297 if (partitionEdges == null) {
298 service.getPartitionStore().putPartition(partition);
299 continue;
300 }
301
302 Iterator<Et> iterator =
303 getPartitionEdgesIterator(partitionEdges);
304
305 int count = 0;
306 while (iterator.hasNext()) {
307
308
309
310 if (oocEngine != null &&
311 (++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) {
312 oocEngine.activeThreadCheckIn();
313 }
314 Et entry = iterator.next();
315 I vertexId = getVertexId(entry, representativeVertexId);
316 OutEdges<I, E> outEdges = convertInputToComputeEdges(
317 getPartitionEdges(entry));
318 Vertex<I, V, E> vertex = partition.getVertex(vertexId);
319
320
321 if (vertex == null) {
322 if (createSourceVertexCallback
323 .shouldCreateSourceVertex(vertexId)) {
324
325 vertex = configuration.createVertex();
326 vertex.initialize(createVertexId(entry),
327 configuration.createVertexValue(), outEdges);
328 partition.putVertex(vertex);
329 }
330 } else {
331
332
333 if (vertex.getNumEdges() == 0) {
334 vertex.setEdges(outEdges);
335 } else {
336 for (Edge<I, E> edge : outEdges) {
337 vertex.addEdge(edge);
338 }
339 }
340 if (vertex instanceof Trimmable) {
341 ((Trimmable) vertex).trim();
342 }
343
344
345 partition.saveVertex(vertex);
346 }
347 numVerticesProcessed.inc();
348 iterator.remove();
349 }
350
351
352
353 service.getPartitionStore().putPartition(partition);
354 }
355 if (oocEngine != null) {
356 oocEngine.processingThreadFinish();
357 }
358 return null;
359 }
360 };
361 }
362 };
363 ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads,
364 "move-edges-%d", progressable);
365
366
367 transientEdges.clear();
368
369 if (LOG.isInfoEnabled()) {
370 LOG.info("moveEdgesToVertices: Finished moving incoming edges to " +
371 "vertices.");
372 }
373 }
374 }