This project has retired. For details please refer to its Attic page.
SendPartitionMutationsRequest xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.requests;
20  
21  import org.apache.giraph.comm.ServerData;
22  import org.apache.giraph.graph.VertexMutations;
23  import org.apache.giraph.metrics.GiraphMetrics;
24  import org.apache.giraph.metrics.MetricNames;
25  import org.apache.hadoop.io.Writable;
26  import org.apache.hadoop.io.WritableComparable;
27  import org.apache.log4j.Logger;
28  
29  import com.google.common.collect.Maps;
30  import com.yammer.metrics.core.Histogram;
31  
32  import java.io.DataInput;
33  import java.io.DataOutput;
34  import java.io.IOException;
35  import java.util.Map;
36  import java.util.Map.Entry;
37  import java.util.concurrent.ConcurrentHashMap;
38  import java.util.concurrent.ConcurrentMap;
39  
40  /**
41   * Send a collection of vertex mutations for a partition. This type of request
42   * is used for two purposes: 1) sending mutation requests generated due to user
43   * compute function in the middle of the execution of a superstep, and
44   * 2) sending mutation requests due to partition migration.
45   *
46   * @param <I> Vertex id
47   * @param <V> Vertex data
48   * @param <E> Edge data
49   */
50  @SuppressWarnings("rawtypes")
51  public class SendPartitionMutationsRequest<I extends WritableComparable,
52      V extends Writable, E extends Writable> extends
53      WritableRequest<I, V, E> implements WorkerRequest<I, V, E> {
54    /** Class logger */
55    private static final Logger LOG =
56        Logger.getLogger(SendPartitionMutationsRequest.class);
57    /** Partition id */
58    private int partitionId;
59    /** Mutations sent for a partition */
60    private Map<I, VertexMutations<I, V, E>> vertexIdMutations;
61  
62    /**
63     * Constructor used for reflection only
64     */
65    public SendPartitionMutationsRequest() { }
66  
67    /**
68     * Constructor used to send request.
69     *
70     * @param partitionId Partition to send the request to
71     * @param vertexIdMutations Map of mutations to send
72     */
73    public SendPartitionMutationsRequest(
74        int partitionId,
75        Map<I, VertexMutations<I, V, E>> vertexIdMutations) {
76      this.partitionId = partitionId;
77      this.vertexIdMutations = vertexIdMutations;
78    }
79  
80    @Override
81    public void readFieldsRequest(DataInput input) throws IOException {
82      partitionId = input.readInt();
83      int vertexIdMutationsSize = input.readInt();
84      // The request is going to be served by adding/merging it with the current
85      // mutations stored in ServerData. Since the mutations stored in ServerData
86      // is in the form of a ConcurrentMap, the data here is being read in this
87      // form, so it would be more efficient to merge/add the mutations in this
88      // request with/to mutations stored in SeverData.
89      vertexIdMutations = Maps.newConcurrentMap();
90      for (int i = 0; i < vertexIdMutationsSize; ++i) {
91        I vertexId = getConf().createVertexId();
92        vertexId.readFields(input);
93        VertexMutations<I, V, E> vertexMutations =
94            new VertexMutations<I, V, E>();
95        vertexMutations.setConf(getConf());
96        vertexMutations.readFields(input);
97        if (vertexIdMutations.put(vertexId, vertexMutations) != null) {
98          throw new IllegalStateException(
99              "readFields: Already has vertex id " + vertexId);
100       }
101     }
102   }
103 
104   @Override
105   public void writeRequest(DataOutput output) throws IOException {
106     output.writeInt(partitionId);
107     output.writeInt(vertexIdMutations.size());
108     for (Entry<I, VertexMutations<I, V, E>> entry :
109         vertexIdMutations.entrySet()) {
110       entry.getKey().write(output);
111       entry.getValue().write(output);
112     }
113   }
114 
115   @Override
116   public RequestType getType() {
117     return RequestType.SEND_PARTITION_MUTATIONS_REQUEST;
118   }
119 
120   @Override
121   public void doRequest(ServerData<I, V, E> serverData) {
122     ConcurrentMap<Integer, ConcurrentMap<I, VertexMutations<I, V, E>>>
123         partitionMutations = serverData.getPartitionMutations();
124     Histogram verticesInMutationHist = GiraphMetrics.get().perSuperstep()
125         .getUniformHistogram(MetricNames.VERTICES_IN_MUTATION_REQUEST);
126     int mutationSize = 0;
127     for (Map<I, VertexMutations<I, V, E>> map : partitionMutations.values()) {
128       mutationSize += map.size();
129     }
130     verticesInMutationHist.update(mutationSize);
131     // If the request is a result of sending mutations in the middle of the
132     // superstep to local partitions, the request is "short-circuit"ed and
133     // vertexIdMutations is coming from an instance of SendMutationsCache.
134     // Since the vertex mutations are created locally, they are not stored in
135     // a ConcurrentMap. So, we first need to transform the data structure
136     // for more efficiently merge/add process.
137     if (!(vertexIdMutations instanceof ConcurrentMap)) {
138       vertexIdMutations = new ConcurrentHashMap<>(vertexIdMutations);
139     }
140 
141     ConcurrentMap<I, VertexMutations<I, V, E>> currentVertexIdMutations =
142         partitionMutations.putIfAbsent(partitionId,
143             (ConcurrentMap<I, VertexMutations<I, V, E>>) vertexIdMutations);
144 
145     if (currentVertexIdMutations != null) {
146       for (Entry<I, VertexMutations<I, V, E>> entry : vertexIdMutations
147           .entrySet()) {
148         VertexMutations<I, V, E> mutations = currentVertexIdMutations
149             .putIfAbsent(entry.getKey(), entry.getValue());
150         if (mutations != null) {
151           synchronized (mutations) {
152             mutations.addVertexMutations(entry.getValue());
153           }
154         }
155       }
156     }
157   }
158 
159   @Override
160   public int getSerializedSize() {
161     return WritableRequest.UNKNOWN_SIZE;
162   }
163 }