This project has retired. For details please refer to its Attic page.
SimplePartition xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.partition;
20  
21  import java.io.DataInput;
22  import java.io.DataOutput;
23  import java.io.IOException;
24  import java.util.Iterator;
25  import java.util.concurrent.ConcurrentMap;
26  
27  import javax.annotation.concurrent.ThreadSafe;
28  
29  import org.apache.giraph.edge.Edge;
30  import org.apache.giraph.graph.Vertex;
31  import org.apache.giraph.utils.WritableUtils;
32  import org.apache.hadoop.io.Writable;
33  import org.apache.hadoop.io.WritableComparable;
34  import org.apache.hadoop.util.Progressable;
35  
36  import com.google.common.collect.Maps;
37  
38  /**
39   * A simple map-based container that stores vertices.  Vertex ids will map to
40   * exactly one partition.
41   *
42   * @param <I> Vertex id
43   * @param <V> Vertex data
44   * @param <E> Edge data
45   */
46  @ThreadSafe
47  @SuppressWarnings("rawtypes")
48  public class SimplePartition<I extends WritableComparable,
49      V extends Writable, E extends Writable>
50      extends BasicPartition<I, V, E> {
51    /** Vertex map for this range (keyed by index) */
52    private ConcurrentMap<I, Vertex<I, V, E>> vertexMap;
53  
54    /**
55     * Constructor for reflection.
56     */
57    public SimplePartition() { }
58  
59    @Override
60    public void initialize(int partitionId, Progressable progressable) {
61      super.initialize(partitionId, progressable);
62      vertexMap = Maps.newConcurrentMap();
63    }
64  
65    @Override
66    public Vertex<I, V, E> getVertex(I vertexIndex) {
67      return vertexMap.get(vertexIndex);
68    }
69  
70    @Override
71    public Vertex<I, V, E> putVertex(Vertex<I, V, E> vertex) {
72      return vertexMap.put(vertex.getId(), vertex);
73    }
74  
75    @Override
76    public Vertex<I, V, E> removeVertex(I vertexIndex) {
77      return vertexMap.remove(vertexIndex);
78    }
79  
80    @Override
81    public boolean putOrCombine(Vertex<I, V, E> vertex) {
82      Vertex<I, V, E> originalVertex = vertexMap.get(vertex.getId());
83      if (originalVertex == null) {
84        originalVertex =
85            vertexMap.putIfAbsent(vertex.getId(), vertex);
86        if (originalVertex == null) {
87          return true;
88        }
89      }
90  
91      synchronized (originalVertex) {
92        // Combine the vertex values
93        getVertexValueCombiner().combine(
94            originalVertex.getValue(), vertex.getValue());
95  
96        // Add the edges to the representative vertex
97        for (Edge<I, E> edge : vertex.getEdges()) {
98          originalVertex.addEdge(edge);
99        }
100     }
101 
102     return false;
103   }
104 
105   @Override
106   public void addPartition(Partition<I, V, E> partition) {
107     for (Vertex<I, V, E> vertex : partition) {
108       putOrCombine(vertex);
109     }
110   }
111 
112   @Override
113   public long getVertexCount() {
114     return vertexMap.size();
115   }
116 
117   @Override
118   public long getEdgeCount() {
119     long edges = 0;
120     for (Vertex<I, V, E> vertex : vertexMap.values()) {
121       edges += vertex.getNumEdges();
122     }
123     return edges;
124   }
125 
126   @Override
127   public void saveVertex(Vertex<I, V, E> vertex) {
128     // No-op, vertices are stored as Java objects in this partition
129   }
130 
131   @Override
132   public String toString() {
133     return "(id=" + getId() + ",V=" + vertexMap.size() + ")";
134   }
135 
136   @Override
137   public void readFields(DataInput input) throws IOException {
138     super.readFields(input);
139     vertexMap = Maps.newConcurrentMap();
140     int vertices = input.readInt();
141     for (int i = 0; i < vertices; ++i) {
142       progress();
143       Vertex<I, V, E> vertex =
144           WritableUtils.readVertexFromDataInput(input, getConf());
145       if (vertexMap.put(vertex.getId(), vertex) != null) {
146         throw new IllegalStateException(
147             "readFields: " + this +
148             " already has same id " + vertex);
149       }
150     }
151   }
152 
153   @Override
154   public void write(DataOutput output) throws IOException {
155     super.write(output);
156     output.writeInt(vertexMap.size());
157     for (Vertex<I, V, E> vertex : vertexMap.values()) {
158       progress();
159       WritableUtils.writeVertexToDataOutput(output, vertex, getConf());
160     }
161   }
162 
163   @Override
164   public Iterator<Vertex<I, V, E>> iterator() {
165     return vertexMap.values().iterator();
166   }
167 }