This project has retired. For details please refer to its Attic page.
SendMutationsCache xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.comm;
19  
20  import org.apache.giraph.edge.Edge;
21  import org.apache.giraph.graph.Vertex;
22  import org.apache.giraph.graph.VertexMutations;
23  import org.apache.hadoop.io.Writable;
24  import org.apache.hadoop.io.WritableComparable;
25  
26  import java.util.HashMap;
27  import java.util.Map;
28  
29  /**
30   * Aggregates the mutations to be sent to partitions so they can be sent in
31   * bulk. Not thread-safe.
32   *
33   * @param <I> Vertex id
34   * @param <V> Vertex data
35   * @param <E> Edge data
36   */
37  @SuppressWarnings("rawtypes")
38  public class SendMutationsCache<I extends WritableComparable,
39      V extends Writable, E extends Writable> {
40    /** Internal cache */
41    private Map<Integer, Map<I, VertexMutations<I, V, E>>> mutationCache =
42        new HashMap<Integer, Map<I, VertexMutations<I, V, E>>>();
43    /** Number of mutations in each partition */
44    private final Map<Integer, Integer> mutationCountMap =
45        new HashMap<Integer, Integer>();
46  
47    /**
48     * Get the mutations for a partition and destination vertex (creating if
49     * it doesn't exist).
50     *
51     * @param partitionId Partition id
52     * @param destVertexId Destination vertex id
53     * @return Mutations for the vertex
54     */
55    private VertexMutations<I, V, E> getVertexMutations(
56        Integer partitionId, I destVertexId) {
57      Map<I, VertexMutations<I, V, E>> idMutations =
58          mutationCache.get(partitionId);
59      if (idMutations == null) {
60        idMutations = new HashMap<I, VertexMutations<I, V, E>>();
61        mutationCache.put(partitionId, idMutations);
62      }
63      VertexMutations<I, V, E> mutations = idMutations.get(destVertexId);
64      if (mutations == null) {
65        mutations = new VertexMutations<I, V, E>();
66        idMutations.put(destVertexId, mutations);
67      }
68      return mutations;
69    }
70  
71    /**
72     * Increment the number of mutations in a partition.
73     *
74     * @param partitionId Partition id
75     * @return Number of mutations in a partition after the increment
76     */
77    private int incrementPartitionMutationCount(int partitionId) {
78      Integer currentPartitionMutationCount = mutationCountMap.get(partitionId);
79      if (currentPartitionMutationCount == null) {
80        currentPartitionMutationCount = 0;
81      }
82      Integer updatedPartitionMutationCount =
83          currentPartitionMutationCount + 1;
84      mutationCountMap.put(partitionId, updatedPartitionMutationCount);
85      return updatedPartitionMutationCount;
86    }
87  
88    /**
89     * Add an add edge mutation to the cache.
90     *
91     * @param partitionId Partition id
92     * @param destVertexId Destination vertex id
93     * @param edge Edge to be added
94     * @return Number of mutations in the partition.
95     */
96    public int addEdgeMutation(
97        Integer partitionId, I destVertexId, Edge<I, E> edge) {
98      // Get the mutations for this partition
99      VertexMutations<I, V, E> mutations =
100         getVertexMutations(partitionId, destVertexId);
101 
102     // Add the edge
103     mutations.addEdge(edge);
104 
105     // Update the number of mutations per partition
106     return incrementPartitionMutationCount(partitionId);
107   }
108 
109   /**
110    * Add a remove edge mutation to the cache.
111    *
112    * @param partitionId Partition id
113    * @param vertexIndex Destination vertex id
114    * @param destinationVertexIndex Edge vertex index to be removed
115    * @return Number of mutations in the partition.
116    */
117   public int removeEdgeMutation(
118       Integer partitionId, I vertexIndex, I destinationVertexIndex) {
119     // Get the mutations for this partition
120     VertexMutations<I, V, E> mutations =
121         getVertexMutations(partitionId, vertexIndex);
122 
123     // Remove the edge
124     mutations.removeEdge(destinationVertexIndex);
125 
126     // Update the number of mutations per partition
127     return incrementPartitionMutationCount(partitionId);
128   }
129 
130   /**
131    * Add a add vertex mutation to the cache.
132    *
133    * @param partitionId Partition id
134    * @param vertex Vertex to be added
135    * @return Number of mutations in the partition.
136    */
137   public int addVertexMutation(
138       Integer partitionId, Vertex<I, V, E> vertex) {
139     // Get the mutations for this partition
140     VertexMutations<I, V, E> mutations =
141         getVertexMutations(partitionId, vertex.getId());
142 
143     // Add the vertex
144     mutations.addVertex(vertex);
145 
146     // Update the number of mutations per partition
147     return incrementPartitionMutationCount(partitionId);
148   }
149 
150   /**
151    * Add a remove vertex mutation to the cache.
152    *
153    * @param partitionId Partition id
154    * @param destVertexId Vertex index to be removed
155    * @return Number of mutations in the partition.
156    */
157   public int removeVertexMutation(
158       Integer partitionId, I destVertexId) {
159     // Get the mutations for this partition
160     VertexMutations<I, V, E> mutations =
161         getVertexMutations(partitionId, destVertexId);
162 
163     // Remove the vertex
164     mutations.removeVertex();
165 
166     // Update the number of mutations per partition
167     return incrementPartitionMutationCount(partitionId);
168   }
169 
170   /**
171    * Gets the mutations for a partition and removes it from the cache.
172    *
173    * @param partitionId Partition id
174    * @return Removed partition mutations
175    */
176   public Map<I, VertexMutations<I, V, E>> removePartitionMutations(
177       int partitionId) {
178     Map<I, VertexMutations<I, V, E>> idMutations =
179         mutationCache.remove(partitionId);
180     mutationCountMap.put(partitionId, 0);
181     return idMutations;
182   }
183 
184   /**
185    * Gets all the mutations and removes them from the cache.
186    *
187    * @return All vertex mutations for all partitions
188    */
189   public Map<Integer, Map<I, VertexMutations<I, V, E>>>
190   removeAllPartitionMutations() {
191     Map<Integer, Map<I, VertexMutations<I, V, E>>> allMutations =
192         mutationCache;
193     mutationCache =
194         new HashMap<Integer, Map<I, VertexMutations<I, V, E>>>();
195     mutationCountMap.clear();
196     return allMutations;
197   }
198 }