This project has retired. For details please refer to its Attic page.
SendWorkerVerticesRequest xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.requests;
20  
21  import org.apache.giraph.comm.ServerData;
22  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23  import org.apache.giraph.utils.ExtendedDataOutput;
24  import org.apache.giraph.utils.PairList;
25  import org.apache.giraph.utils.WritableUtils;
26  import org.apache.hadoop.io.Writable;
27  import org.apache.hadoop.io.WritableComparable;
28  import org.apache.log4j.Logger;
29  
30  import java.io.DataInput;
31  import java.io.DataOutput;
32  import java.io.IOException;
33  
34  /**
35   * Send to a worker one or more partitions of vertices
36   *
37   * @param <I> Vertex id
38   * @param <V> Vertex data
39   * @param <E> Edge data
40   */
41  @SuppressWarnings("rawtypes")
42  public class SendWorkerVerticesRequest<I extends WritableComparable,
43      V extends Writable, E extends Writable> extends
44      WritableRequest<I, V, E> implements WorkerRequest<I, V, E> {
45    /** Class logger */
46    private static final Logger LOG =
47        Logger.getLogger(SendWorkerVerticesRequest.class);
48    /** Worker partitions to be sent */
49    private PairList<Integer, ExtendedDataOutput> workerPartitions;
50  
51    /**
52     * Constructor used for reflection only
53     */
54    public SendWorkerVerticesRequest() { }
55  
56    /**
57     * Constructor for sending a request.
58     *
59     * @param conf Configuration
60     * @param workerPartitions Partitions to be send in this request
61     */
62    public SendWorkerVerticesRequest(
63        ImmutableClassesGiraphConfiguration<I, V, E> conf,
64        PairList<Integer, ExtendedDataOutput> workerPartitions) {
65      this.workerPartitions = workerPartitions;
66      setConf(conf);
67    }
68  
69    @Override
70    public void readFieldsRequest(DataInput input) throws IOException {
71      int numPartitions = input.readInt();
72      workerPartitions = new PairList<Integer, ExtendedDataOutput>();
73      workerPartitions.initialize(numPartitions);
74      while (numPartitions-- > 0) {
75        final int partitionId = input.readInt();
76        ExtendedDataOutput partitionData =
77            WritableUtils.readExtendedDataOutput(input, getConf());
78        workerPartitions.add(partitionId, partitionData);
79      }
80    }
81  
82    @Override
83    public void writeRequest(DataOutput output) throws IOException {
84      output.writeInt(workerPartitions.getSize());
85      PairList<Integer, ExtendedDataOutput>.Iterator
86          iterator = workerPartitions.getIterator();
87      while (iterator.hasNext()) {
88        iterator.next();
89        output.writeInt(iterator.getCurrentFirst());
90        WritableUtils.writeExtendedDataOutput(
91            iterator.getCurrentSecond(), output);
92      }
93    }
94  
95    @Override
96    public RequestType getType() {
97      return RequestType.SEND_WORKER_VERTICES_REQUEST;
98    }
99  
100   @Override
101   public void doRequest(ServerData<I, V, E> serverData) {
102     PairList<Integer, ExtendedDataOutput>.Iterator
103         iterator = workerPartitions.getIterator();
104     while (iterator.hasNext()) {
105       iterator.next();
106       serverData.getPartitionStore()
107           .addPartitionVertices(iterator.getCurrentFirst(),
108               iterator.getCurrentSecond());
109     }
110   }
111 
112   @Override
113   public int getSerializedSize() {
114     // 4 for number of partitions
115     int size = super.getSerializedSize() + 4;
116     PairList<Integer, ExtendedDataOutput>.Iterator iterator =
117         workerPartitions.getIterator();
118     while (iterator.hasNext()) {
119       iterator.next();
120       // 4 bytes for the partition id and 4 bytes for the size
121       size += 8 + iterator.getCurrentSecond().getPos();
122     }
123     return size;
124   }
125 }
126