This project has retired. For details please refer to its
Attic page.
SimplePartition xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.partition;
20
21 import java.io.DataInput;
22 import java.io.DataOutput;
23 import java.io.IOException;
24 import java.util.Iterator;
25 import java.util.concurrent.ConcurrentMap;
26
27 import javax.annotation.concurrent.ThreadSafe;
28
29 import org.apache.giraph.edge.Edge;
30 import org.apache.giraph.graph.Vertex;
31 import org.apache.giraph.utils.WritableUtils;
32 import org.apache.hadoop.io.Writable;
33 import org.apache.hadoop.io.WritableComparable;
34 import org.apache.hadoop.util.Progressable;
35
36 import com.google.common.collect.Maps;
37
38
39
40
41
42
43
44
45
46 @ThreadSafe
47 @SuppressWarnings("rawtypes")
48 public class SimplePartition<I extends WritableComparable,
49 V extends Writable, E extends Writable>
50 extends BasicPartition<I, V, E> {
51
52 private ConcurrentMap<I, Vertex<I, V, E>> vertexMap;
53
54
55
56
57 public SimplePartition() { }
58
59 @Override
60 public void initialize(int partitionId, Progressable progressable) {
61 super.initialize(partitionId, progressable);
62 vertexMap = Maps.newConcurrentMap();
63 }
64
65 @Override
66 public Vertex<I, V, E> getVertex(I vertexIndex) {
67 return vertexMap.get(vertexIndex);
68 }
69
70 @Override
71 public Vertex<I, V, E> putVertex(Vertex<I, V, E> vertex) {
72 return vertexMap.put(vertex.getId(), vertex);
73 }
74
75 @Override
76 public Vertex<I, V, E> removeVertex(I vertexIndex) {
77 return vertexMap.remove(vertexIndex);
78 }
79
80 @Override
81 public boolean putOrCombine(Vertex<I, V, E> vertex) {
82 Vertex<I, V, E> originalVertex = vertexMap.get(vertex.getId());
83 if (originalVertex == null) {
84 originalVertex =
85 vertexMap.putIfAbsent(vertex.getId(), vertex);
86 if (originalVertex == null) {
87 return true;
88 }
89 }
90
91 synchronized (originalVertex) {
92
93 getVertexValueCombiner().combine(
94 originalVertex.getValue(), vertex.getValue());
95
96
97 for (Edge<I, E> edge : vertex.getEdges()) {
98 originalVertex.addEdge(edge);
99 }
100 }
101
102 return false;
103 }
104
105 @Override
106 public void addPartition(Partition<I, V, E> partition) {
107 for (Vertex<I, V, E> vertex : partition) {
108 putOrCombine(vertex);
109 }
110 }
111
112 @Override
113 public long getVertexCount() {
114 return vertexMap.size();
115 }
116
117 @Override
118 public long getEdgeCount() {
119 long edges = 0;
120 for (Vertex<I, V, E> vertex : vertexMap.values()) {
121 edges += vertex.getNumEdges();
122 }
123 return edges;
124 }
125
126 @Override
127 public void saveVertex(Vertex<I, V, E> vertex) {
128
129 }
130
131 @Override
132 public String toString() {
133 return "(id=" + getId() + ",V=" + vertexMap.size() + ")";
134 }
135
136 @Override
137 public void readFields(DataInput input) throws IOException {
138 super.readFields(input);
139 vertexMap = Maps.newConcurrentMap();
140 int vertices = input.readInt();
141 for (int i = 0; i < vertices; ++i) {
142 progress();
143 Vertex<I, V, E> vertex =
144 WritableUtils.readVertexFromDataInput(input, getConf());
145 if (vertexMap.put(vertex.getId(), vertex) != null) {
146 throw new IllegalStateException(
147 "readFields: " + this +
148 " already has same id " + vertex);
149 }
150 }
151 }
152
153 @Override
154 public void write(DataOutput output) throws IOException {
155 super.write(output);
156 output.writeInt(vertexMap.size());
157 for (Vertex<I, V, E> vertex : vertexMap.values()) {
158 progress();
159 WritableUtils.writeVertexToDataOutput(output, vertex, getConf());
160 }
161 }
162
163 @Override
164 public Iterator<Vertex<I, V, E>> iterator() {
165 return vertexMap.values().iterator();
166 }
167 }