1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm.requests;
2021import org.apache.giraph.comm.ServerData;
22import org.apache.giraph.graph.VertexMutations;
23import org.apache.giraph.metrics.GiraphMetrics;
24import org.apache.giraph.metrics.MetricNames;
25import org.apache.hadoop.io.Writable;
26import org.apache.hadoop.io.WritableComparable;
27import org.apache.log4j.Logger;
2829import com.google.common.collect.Maps;
30import com.yammer.metrics.core.Histogram;
3132import java.io.DataInput;
33import java.io.DataOutput;
34import java.io.IOException;
35import java.util.Map;
36import java.util.Map.Entry;
37import java.util.concurrent.ConcurrentHashMap;
38import java.util.concurrent.ConcurrentMap;
3940/**41 * Send a collection of vertex mutations for a partition. This type of request42 * is used for two purposes: 1) sending mutation requests generated due to user43 * compute function in the middle of the execution of a superstep, and44 * 2) sending mutation requests due to partition migration.45 *46 * @param <I> Vertex id47 * @param <V> Vertex data48 * @param <E> Edge data49 */50 @SuppressWarnings("rawtypes")
51publicclass SendPartitionMutationsRequest<I extends WritableComparable,
52 V extends Writable, E extends Writable> extends53 WritableRequest<I, V, E> implements WorkerRequest<I, V, E> {
54/** Class logger */55privatestaticfinal Logger LOG =
56 Logger.getLogger(SendPartitionMutationsRequest.class);
57/** Partition id */58privateint partitionId;
59/** Mutations sent for a partition */60private Map<I, VertexMutations<I, V, E>> vertexIdMutations;
6162/**63 * Constructor used for reflection only64 */65publicSendPartitionMutationsRequest() { }
6667/**68 * Constructor used to send request.69 *70 * @param partitionId Partition to send the request to71 * @param vertexIdMutations Map of mutations to send72 */73publicSendPartitionMutationsRequest(
74int partitionId,
75 Map<I, VertexMutations<I, V, E>> vertexIdMutations) {
76this.partitionId = partitionId;
77this.vertexIdMutations = vertexIdMutations;
78 }
7980 @Override
81publicvoid readFieldsRequest(DataInput input) throws IOException {
82 partitionId = input.readInt();
83int vertexIdMutationsSize = input.readInt();
84// The request is going to be served by adding/merging it with the current85// mutations stored in ServerData. Since the mutations stored in ServerData86// is in the form of a ConcurrentMap, the data here is being read in this87// form, so it would be more efficient to merge/add the mutations in this88// request with/to mutations stored in SeverData.89 vertexIdMutations = Maps.newConcurrentMap();
90for (int i = 0; i < vertexIdMutationsSize; ++i) {
91 I vertexId = getConf().createVertexId();
92 vertexId.readFields(input);
93 VertexMutations<I, V, E> vertexMutations =
94new VertexMutations<I, V, E>();
95 vertexMutations.setConf(getConf());
96 vertexMutations.readFields(input);
97if (vertexIdMutations.put(vertexId, vertexMutations) != null) {
98thrownew IllegalStateException(
99"readFields: Already has vertex id " + vertexId);
100 }
101 }
102 }
103104 @Override
105publicvoid writeRequest(DataOutput output) throws IOException {
106 output.writeInt(partitionId);
107 output.writeInt(vertexIdMutations.size());
108for (Entry<I, VertexMutations<I, V, E>> entry :
109 vertexIdMutations.entrySet()) {
110 entry.getKey().write(output);
111 entry.getValue().write(output);
112 }
113 }
114115 @Override
116publicRequestType getType() {
117return RequestType.SEND_PARTITION_MUTATIONS_REQUEST;
118 }
119120 @Override
121publicvoid doRequest(ServerData<I, V, E> serverData) {
122 ConcurrentMap<Integer, ConcurrentMap<I, VertexMutations<I, V, E>>>
123 partitionMutations = serverData.getPartitionMutations();
124 Histogram verticesInMutationHist = GiraphMetrics.get().perSuperstep()
125 .getUniformHistogram(MetricNames.VERTICES_IN_MUTATION_REQUEST);
126int mutationSize = 0;
127for (Map<I, VertexMutations<I, V, E>> map : partitionMutations.values()) {
128 mutationSize += map.size();
129 }
130 verticesInMutationHist.update(mutationSize);
131// If the request is a result of sending mutations in the middle of the132// superstep to local partitions, the request is "short-circuit"ed and133// vertexIdMutations is coming from an instance of SendMutationsCache.134// Since the vertex mutations are created locally, they are not stored in135// a ConcurrentMap. So, we first need to transform the data structure136// for more efficiently merge/add process.137if (!(vertexIdMutations instanceof ConcurrentMap)) {
138 vertexIdMutations = new ConcurrentHashMap<>(vertexIdMutations);
139 }
140141 ConcurrentMap<I, VertexMutations<I, V, E>> currentVertexIdMutations =
142 partitionMutations.putIfAbsent(partitionId,
143 (ConcurrentMap<I, VertexMutations<I, V, E>>) vertexIdMutations);
144145if (currentVertexIdMutations != null) {
146for (Entry<I, VertexMutations<I, V, E>> entry : vertexIdMutations
147 .entrySet()) {
148 VertexMutations<I, V, E> mutations = currentVertexIdMutations
149 .putIfAbsent(entry.getKey(), entry.getValue());
150if (mutations != null) {
151synchronized (mutations) {
152 mutations.addVertexMutations(entry.getValue());
153 }
154 }
155 }
156 }
157 }
158159 @Override
160publicint getSerializedSize() {
161return WritableRequest.UNKNOWN_SIZE;
162 }
163 }