This project has retired. For details please refer to its Attic page.
SendWorkerEdgesRequest xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.requests;
20  
21  import org.apache.giraph.comm.ServerData;
22  import org.apache.giraph.edge.Edge;
23  import org.apache.giraph.utils.ByteArrayVertexIdEdges;
24  import org.apache.giraph.utils.PairList;
25  import org.apache.giraph.utils.VertexIdEdges;
26  import org.apache.hadoop.io.Writable;
27  import org.apache.hadoop.io.WritableComparable;
28  
29  /**
30   * Send a collection of edges for a partition.
31   *
32   * @param <I> Vertex id
33   * @param <E> Edge data
34   */
35  @SuppressWarnings("unchecked")
36  public class SendWorkerEdgesRequest<I extends WritableComparable,
37      E extends Writable>
38      extends SendWorkerDataRequest<I, Edge<I, E>,
39      VertexIdEdges<I, E>> {
40    /**
41     * Constructor used for reflection only
42     */
43    public SendWorkerEdgesRequest() { }
44  
45    /**
46     * Constructor used to send request.
47     *
48     * @param partVertEdges Map of remote partitions =&gt;
49     *                     ByteArrayVertexIdEdges
50     */
51    public SendWorkerEdgesRequest(
52        PairList<Integer, VertexIdEdges<I, E>> partVertEdges) {
53      this.partitionVertexData = partVertEdges;
54    }
55  
56    @Override
57    public VertexIdEdges<I, E> createVertexIdData() {
58      return new ByteArrayVertexIdEdges<>();
59    }
60  
61    @Override
62    public RequestType getType() {
63      return RequestType.SEND_WORKER_EDGES_REQUEST;
64    }
65  
66    @Override
67    public void doRequest(ServerData serverData) {
68      PairList<Integer, VertexIdEdges<I, E>>.Iterator
69          iterator = partitionVertexData.getIterator();
70      while (iterator.hasNext()) {
71        iterator.next();
72        serverData.getEdgeStore()
73            .addPartitionEdges(iterator.getCurrentFirst(),
74                iterator.getCurrentSecond());
75      }
76    }
77  }