This project has retired. For details please refer to its Attic page.
SendPartitionCache xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.comm;
19  
20  import org.apache.giraph.bsp.CentralizedServiceWorker;
21  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22  import org.apache.giraph.graph.Vertex;
23  import org.apache.giraph.partition.PartitionOwner;
24  import org.apache.giraph.utils.ExtendedDataOutput;
25  import org.apache.giraph.utils.WritableUtils;
26  import org.apache.hadoop.io.Writable;
27  import org.apache.hadoop.io.WritableComparable;
28  import org.apache.log4j.Logger;
29  
30  import java.io.IOException;
31  
32  import static org.apache.giraph.conf.GiraphConstants.ADDITIONAL_VERTEX_REQUEST_SIZE;
33  import static org.apache.giraph.conf.GiraphConstants.MAX_VERTEX_REQUEST_SIZE;
34  
35  /**
36   * Caches partition vertices prior to sending.  Aggregating these requests
37   * will make larger, more efficient requests.  Not thread-safe.
38   *
39   * @param <I> Vertex index value
40   * @param <V> Vertex value
41   * @param <E> Edge value
42   */
43  public class SendPartitionCache<I extends WritableComparable,
44      V extends Writable, E extends Writable> extends
45      SendDataCache<ExtendedDataOutput> {
46    /** Class logger */
47    private static final Logger LOG =
48        Logger.getLogger(SendPartitionCache.class);
49  
50    /**
51     * Constructor.
52     *
53     * @param conf Giraph configuration
54     * @param serviceWorker Service worker
55     */
56    public SendPartitionCache(ImmutableClassesGiraphConfiguration<I, V, E> conf,
57                              CentralizedServiceWorker<?, ?, ?> serviceWorker) {
58      super(conf, serviceWorker, MAX_VERTEX_REQUEST_SIZE.get(conf),
59          ADDITIONAL_VERTEX_REQUEST_SIZE.get(conf));
60    }
61  
62    /**
63     * Add a vertex to the cache.
64     *
65     * @param partitionOwner Partition owner of the vertex
66     * @param vertex Vertex to add
67     * @return Size of partitions for this worker
68     */
69    public int addVertex(PartitionOwner partitionOwner,
70        Vertex<I, V, E> vertex) {
71      // Get the data collection
72      ExtendedDataOutput partitionData =
73          getData(partitionOwner.getPartitionId());
74      int taskId = partitionOwner.getWorkerInfo().getTaskId();
75      int originalSize = 0;
76      if (partitionData == null) {
77        partitionData = getConf().createExtendedDataOutput(
78            getInitialBufferSize(taskId));
79        setData(partitionOwner.getPartitionId(), partitionData);
80      } else {
81        originalSize = partitionData.getPos();
82      }
83      try {
84        WritableUtils.<I, V, E>writeVertexToDataOutput(
85            partitionData, vertex, getConf());
86      } catch (IOException e) {
87        throw new IllegalStateException("addVertex: Failed to serialize", e);
88      }
89  
90      // Update the size of cached, outgoing data per worker
91      return incrDataSize(taskId, partitionData.getPos() - originalSize);
92    }
93  }
94