This project has retired. For details please refer to its
Attic page.
IntEdgeStore xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.edge.primitives;
20
21 import org.apache.giraph.bsp.CentralizedServiceWorker;
22 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23 import org.apache.giraph.edge.AbstractEdgeStore;
24 import org.apache.giraph.edge.OutEdges;
25 import org.apache.giraph.utils.VertexIdEdgeIterator;
26 import org.apache.hadoop.io.IntWritable;
27 import org.apache.hadoop.io.Writable;
28 import org.apache.hadoop.util.Progressable;
29
30 import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
31 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
32 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
33
34 import java.io.DataInput;
35 import java.io.DataOutput;
36 import java.io.IOException;
37 import java.util.Iterator;
38 import java.util.Map;
39
40
41
42
43
44
45
46
47
48 public class IntEdgeStore<V extends Writable, E extends Writable>
49 extends AbstractEdgeStore<IntWritable, V, E, Integer,
50 Int2ObjectMap.Entry<OutEdges<IntWritable, E>>> {
51
52
53
54
55
56
57
58
59 public IntEdgeStore(
60 CentralizedServiceWorker<IntWritable, V, E> service,
61 ImmutableClassesGiraphConfiguration<IntWritable, V, E> configuration,
62 Progressable progressable) {
63 super(service, configuration, progressable);
64 }
65
66 @Override
67 protected IntWritable getVertexId(
68 Int2ObjectMap.Entry<OutEdges<IntWritable, E>> entry,
69 IntWritable representativeVertexId) {
70 representativeVertexId.set(entry.getIntKey());
71 return representativeVertexId;
72 }
73
74 @Override
75 protected IntWritable createVertexId(
76 Int2ObjectMap.Entry<OutEdges<IntWritable, E>> entry) {
77 return new IntWritable(entry.getIntKey());
78 }
79
80 @Override
81 protected OutEdges<IntWritable, E> getPartitionEdges(
82 Int2ObjectMap.Entry<OutEdges<IntWritable, E>> entry) {
83 return entry.getValue();
84 }
85
86 @Override
87 protected void writeVertexKey(Integer key, DataOutput output)
88 throws IOException {
89 output.writeInt(key);
90 }
91
92 @Override
93 protected Integer readVertexKey(DataInput input)
94 throws IOException {
95 return input.readInt();
96 }
97
98 @Override
99 protected Iterator<Int2ObjectMap.Entry<OutEdges<IntWritable, E>>>
100 getPartitionEdgesIterator(
101 Map<Integer, OutEdges<IntWritable, E>> partitionEdges) {
102 return ((Int2ObjectMap<OutEdges<IntWritable, E>>) partitionEdges)
103 .int2ObjectEntrySet()
104 .iterator();
105 }
106
107 @Override
108 protected Int2ObjectMap<OutEdges<IntWritable, E>> getPartitionEdges(
109 int partitionId) {
110 Int2ObjectMap<OutEdges<IntWritable, E>> partitionEdges =
111 (Int2ObjectMap<OutEdges<IntWritable, E>>)
112 transientEdges.get(partitionId);
113 if (partitionEdges == null) {
114 Int2ObjectMap<OutEdges<IntWritable, E>> newPartitionEdges =
115 Int2ObjectMaps.synchronize(
116 new Int2ObjectOpenHashMap<OutEdges<IntWritable, E>>());
117 partitionEdges = (Int2ObjectMap<OutEdges<IntWritable, E>>)
118 transientEdges.putIfAbsent(partitionId,
119 newPartitionEdges);
120 if (partitionEdges == null) {
121 partitionEdges = newPartitionEdges;
122 }
123 }
124 return partitionEdges;
125 }
126
127 @Override
128 protected OutEdges<IntWritable, E> getVertexOutEdges(
129 VertexIdEdgeIterator<IntWritable, E> vertexIdEdgeIterator,
130 Map<Integer, OutEdges<IntWritable, E>> partitionEdgesIn) {
131 Int2ObjectMap<OutEdges<IntWritable, E>> partitionEdges =
132 (Int2ObjectMap<OutEdges<IntWritable, E>>) partitionEdgesIn;
133 IntWritable vertexId = vertexIdEdgeIterator.getCurrentVertexId();
134 OutEdges<IntWritable, E> outEdges = partitionEdges.get(vertexId.get());
135 if (outEdges == null) {
136 synchronized (partitionEdges) {
137 outEdges = partitionEdges.get(vertexId.get());
138 if (outEdges == null) {
139 outEdges = configuration.createAndInitializeInputOutEdges();
140 partitionEdges.put(vertexId.get(), outEdges);
141 }
142 }
143 }
144 return outEdges;
145 }
146 }