1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.comm;
1920import org.apache.giraph.edge.Edge;
21import org.apache.giraph.graph.Vertex;
22import org.apache.giraph.graph.VertexMutations;
23import org.apache.hadoop.io.Writable;
24import org.apache.hadoop.io.WritableComparable;
2526import java.util.HashMap;
27import java.util.Map;
2829/**30 * Aggregates the mutations to be sent to partitions so they can be sent in31 * bulk. Not thread-safe.32 *33 * @param <I> Vertex id34 * @param <V> Vertex data35 * @param <E> Edge data36 */37 @SuppressWarnings("rawtypes")
38publicclass SendMutationsCache<I extends WritableComparable,
39 V extends Writable, E extends Writable> {
40/** Internal cache */41private Map<Integer, Map<I, VertexMutations<I, V, E>>> mutationCache =
42new HashMap<Integer, Map<I, VertexMutations<I, V, E>>>();
43/** Number of mutations in each partition */44privatefinal Map<Integer, Integer> mutationCountMap =
45new HashMap<Integer, Integer>();
4647/**48 * Get the mutations for a partition and destination vertex (creating if49 * it doesn't exist).50 *51 * @param partitionId Partition id52 * @param destVertexId Destination vertex id53 * @return Mutations for the vertex54 */55private VertexMutations<I, V, E> getVertexMutations(
56 Integer partitionId, I destVertexId) {
57 Map<I, VertexMutations<I, V, E>> idMutations =
58 mutationCache.get(partitionId);
59if (idMutations == null) {
60 idMutations = new HashMap<I, VertexMutations<I, V, E>>();
61 mutationCache.put(partitionId, idMutations);
62 }
63 VertexMutations<I, V, E> mutations = idMutations.get(destVertexId);
64if (mutations == null) {
65 mutations = new VertexMutations<I, V, E>();
66 idMutations.put(destVertexId, mutations);
67 }
68return mutations;
69 }
7071/**72 * Increment the number of mutations in a partition.73 *74 * @param partitionId Partition id75 * @return Number of mutations in a partition after the increment76 */77privateint incrementPartitionMutationCount(int partitionId) {
78 Integer currentPartitionMutationCount = mutationCountMap.get(partitionId);
79if (currentPartitionMutationCount == null) {
80 currentPartitionMutationCount = 0;
81 }
82 Integer updatedPartitionMutationCount =
83 currentPartitionMutationCount + 1;
84 mutationCountMap.put(partitionId, updatedPartitionMutationCount);
85return updatedPartitionMutationCount;
86 }
8788/**89 * Add an add edge mutation to the cache.90 *91 * @param partitionId Partition id92 * @param destVertexId Destination vertex id93 * @param edge Edge to be added94 * @return Number of mutations in the partition.95 */96publicint addEdgeMutation(
97 Integer partitionId, I destVertexId, Edge<I, E> edge) {
98// Get the mutations for this partition99 VertexMutations<I, V, E> mutations =
100 getVertexMutations(partitionId, destVertexId);
101102// Add the edge103 mutations.addEdge(edge);
104105// Update the number of mutations per partition106return incrementPartitionMutationCount(partitionId);
107 }
108109/**110 * Add a remove edge mutation to the cache.111 *112 * @param partitionId Partition id113 * @param vertexIndex Destination vertex id114 * @param destinationVertexIndex Edge vertex index to be removed115 * @return Number of mutations in the partition.116 */117publicint removeEdgeMutation(
118 Integer partitionId, I vertexIndex, I destinationVertexIndex) {
119// Get the mutations for this partition120 VertexMutations<I, V, E> mutations =
121 getVertexMutations(partitionId, vertexIndex);
122123// Remove the edge124 mutations.removeEdge(destinationVertexIndex);
125126// Update the number of mutations per partition127return incrementPartitionMutationCount(partitionId);
128 }
129130/**131 * Add a add vertex mutation to the cache.132 *133 * @param partitionId Partition id134 * @param vertex Vertex to be added135 * @return Number of mutations in the partition.136 */137publicint addVertexMutation(
138 Integer partitionId, Vertex<I, V, E> vertex) {
139// Get the mutations for this partition140 VertexMutations<I, V, E> mutations =
141 getVertexMutations(partitionId, vertex.getId());
142143// Add the vertex144 mutations.addVertex(vertex);
145146// Update the number of mutations per partition147return incrementPartitionMutationCount(partitionId);
148 }
149150/**151 * Add a remove vertex mutation to the cache.152 *153 * @param partitionId Partition id154 * @param destVertexId Vertex index to be removed155 * @return Number of mutations in the partition.156 */157publicint removeVertexMutation(
158 Integer partitionId, I destVertexId) {
159// Get the mutations for this partition160 VertexMutations<I, V, E> mutations =
161 getVertexMutations(partitionId, destVertexId);
162163// Remove the vertex164 mutations.removeVertex();
165166// Update the number of mutations per partition167return incrementPartitionMutationCount(partitionId);
168 }
169170/**171 * Gets the mutations for a partition and removes it from the cache.172 *173 * @param partitionId Partition id174 * @return Removed partition mutations175 */176public Map<I, VertexMutations<I, V, E>> removePartitionMutations(
177int partitionId) {
178 Map<I, VertexMutations<I, V, E>> idMutations =
179 mutationCache.remove(partitionId);
180 mutationCountMap.put(partitionId, 0);
181return idMutations;
182 }
183184/**185 * Gets all the mutations and removes them from the cache.186 *187 * @return All vertex mutations for all partitions188 */189public Map<Integer, Map<I, VertexMutations<I, V, E>>>
190 removeAllPartitionMutations() {
191 Map<Integer, Map<I, VertexMutations<I, V, E>>> allMutations =
192 mutationCache;
193 mutationCache =
194new HashMap<Integer, Map<I, VertexMutations<I, V, E>>>();
195 mutationCountMap.clear();
196return allMutations;
197 }
198 }