This project has retired. For details please refer to its
Attic page.
SimpleEdgeStore xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.edge;
20
21 import org.apache.giraph.bsp.CentralizedServiceWorker;
22 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23 import org.apache.giraph.utils.VertexIdEdgeIterator;
24 import org.apache.hadoop.io.Writable;
25 import org.apache.hadoop.io.WritableComparable;
26 import org.apache.hadoop.util.Progressable;
27
28 import com.google.common.collect.MapMaker;
29
30 import java.io.DataInput;
31 import java.io.DataOutput;
32 import java.io.IOException;
33 import java.util.Iterator;
34 import java.util.Map;
35 import java.util.concurrent.ConcurrentMap;
36
37
38
39
40
41
42
43
44 public class SimpleEdgeStore<I extends WritableComparable,
45 V extends Writable, E extends Writable>
46 extends AbstractEdgeStore<I, V, E, I,
47 Map.Entry<I, OutEdges<I, E>>> {
48
49
50
51
52
53
54
55
56 public SimpleEdgeStore(
57 CentralizedServiceWorker<I, V, E> service,
58 ImmutableClassesGiraphConfiguration<I, V, E> configuration,
59 Progressable progressable) {
60 super(service, configuration, progressable);
61 }
62
63 @Override
64 protected I getVertexId(Map.Entry<I, OutEdges<I, E>> entry,
65 I representativeVertexId) {
66 return entry.getKey();
67 }
68
69 @Override
70 protected I createVertexId(Map.Entry<I, OutEdges<I, E>> entry) {
71 return entry.getKey();
72 }
73
74 @Override
75 protected ConcurrentMap<I, OutEdges<I, E>> getPartitionEdges(
76 int partitionId) {
77 ConcurrentMap<I, OutEdges<I, E>> partitionEdges =
78 (ConcurrentMap<I, OutEdges<I, E>>) transientEdges.get(partitionId);
79 if (partitionEdges == null) {
80 ConcurrentMap<I, OutEdges<I, E>> newPartitionEdges =
81 new MapMaker().concurrencyLevel(
82 configuration.getNettyServerExecutionConcurrency()).makeMap();
83 partitionEdges = (ConcurrentMap<I, OutEdges<I, E>>)
84 transientEdges.putIfAbsent(partitionId, newPartitionEdges);
85 if (partitionEdges == null) {
86 partitionEdges = newPartitionEdges;
87 }
88 }
89 return partitionEdges;
90 }
91
92 @Override
93 protected OutEdges<I, E> getPartitionEdges(
94 Map.Entry<I, OutEdges<I, E>> entry) {
95 return entry.getValue();
96 }
97
98 @Override
99 protected void writeVertexKey(I key, DataOutput output) throws IOException {
100 key.write(output);
101 }
102
103 @Override
104 protected I readVertexKey(DataInput input) throws IOException {
105 I id = configuration.createVertexId();
106 id.readFields(input);
107 return id;
108 }
109
110 @Override
111 protected Iterator<Map.Entry<I, OutEdges<I, E>>>
112 getPartitionEdgesIterator(Map<I, OutEdges<I, E>> partitionEdges) {
113 return partitionEdges.entrySet().iterator();
114 }
115
116 @Override
117 protected OutEdges<I, E> getVertexOutEdges(
118 VertexIdEdgeIterator<I, E> vertexIdEdgeIterator,
119 Map<I, OutEdges<I, E>> partitionEdgesIn) {
120 ConcurrentMap<I, OutEdges<I, E>> partitionEdges =
121 (ConcurrentMap<I, OutEdges<I, E>>) partitionEdgesIn;
122 I vertexId = vertexIdEdgeIterator.getCurrentVertexId();
123 OutEdges<I, E> outEdges = partitionEdges.get(vertexId);
124 if (outEdges == null) {
125 OutEdges<I, E> newOutEdges =
126 configuration.createAndInitializeInputOutEdges();
127 outEdges = partitionEdges.putIfAbsent(vertexId, newOutEdges);
128 if (outEdges == null) {
129 outEdges = newOutEdges;
130
131
132 vertexIdEdgeIterator.releaseCurrentVertexId();
133 }
134 }
135 return outEdges;
136 }
137 }