This project has retired. For details please refer to its Attic page.
SendVertexIdDataCache xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm;
20  
21  import org.apache.giraph.bsp.CentralizedServiceWorker;
22  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23  import org.apache.giraph.utils.VertexIdData;
24  import org.apache.giraph.worker.WorkerInfo;
25  import org.apache.hadoop.io.WritableComparable;
26  
27  import javax.annotation.concurrent.NotThreadSafe;
28  
29  /**
30   * An abstract structure for caching data indexed by vertex id,
31   * to be sent to workers in bulk. Not thread-safe.
32   *
33   * @param <I> Vertex id
34   * @param <T> Data
35   * @param <B> Specialization of {@link VertexIdData} for T
36   */
37  @NotThreadSafe
38  @SuppressWarnings("unchecked")
39  public abstract class SendVertexIdDataCache<I extends WritableComparable, T,
40      B extends VertexIdData<I, T>> extends SendDataCache<B> {
41    /**
42     * Constructor.
43     *
44     * @param conf Giraph configuration
45     * @param serviceWorker Service worker
46     * @param maxRequestSize Maximum request size (in bytes)
47     * @param additionalRequestSize Additional request size (expressed as a
48     *                              ratio of the average request size)
49     */
50    public SendVertexIdDataCache(ImmutableClassesGiraphConfiguration conf,
51                                 CentralizedServiceWorker<?, ?, ?> serviceWorker,
52                                 int maxRequestSize,
53                                 float additionalRequestSize) {
54      super(conf, serviceWorker, maxRequestSize, additionalRequestSize);
55    }
56  
57    /**
58     * Create a new {@link VertexIdData} specialized for the use case.
59     *
60     * @return A new instance of {@link VertexIdData}
61     */
62    public abstract B createVertexIdData();
63  
64    /**
65     * Add data to the cache.
66     *
67     * @param workerInfo the remote worker destination
68     * @param partitionId the remote Partition this message belongs to
69     * @param destVertexId vertex id that is ultimate destination
70     * @param data Data to send to remote worker
71     * @return Size of messages for the worker.
72     */
73    public int addData(WorkerInfo workerInfo,
74                       int partitionId, I destVertexId, T data) {
75      // Get the data collection
76      VertexIdData<I, T> partitionData =
77          getPartitionData(workerInfo, partitionId);
78      int originalSize = partitionData.getSize();
79      partitionData.add(destVertexId, data);
80      // Update the size of cached, outgoing data per worker
81      return incrDataSize(workerInfo.getTaskId(),
82          partitionData.getSize() - originalSize);
83    }
84  
85    /**
86     * This method is similar to the method above,
87     * but use a serialized id to replace original I type
88     * destVertexId.
89     *
90     * @param workerInfo The remote worker destination
91     * @param partitionId The remote Partition this message belongs to
92     * @param serializedId The byte array to store the serialized target vertex id
93     * @param idPos The length of bytes of serialized id in the byte array above
94     * @param data Data to send to remote worker
95     * @return The number of bytes added to the target worker
96     */
97    public int addData(WorkerInfo workerInfo, int partitionId,
98                       byte[] serializedId, int idPos, T data) {
99      // Get the data collection
100     VertexIdData<I, T> partitionData =
101         getPartitionData(workerInfo, partitionId);
102     int originalSize = partitionData.getSize();
103     partitionData.add(serializedId, idPos, data);
104     // Update the size of cached, outgoing data per worker
105     return incrDataSize(workerInfo.getTaskId(),
106         partitionData.getSize() - originalSize);
107   }
108 
109   /**
110    * This method tries to get a partition data from the data cache.
111    * If null, it will create one.
112    *
113    * @param workerInfo The remote worker destination
114    * @param partitionId The remote Partition this message belongs to
115    * @return The partition data in data cache
116    */
117   private VertexIdData<I, T> getPartitionData(WorkerInfo workerInfo,
118                                                        int partitionId) {
119     // Get the data collection
120     B partitionData = getData(partitionId);
121     if (partitionData == null) {
122       partitionData = createVertexIdData();
123       partitionData.setConf(getConf());
124       partitionData.initialize(getInitialBufferSize(workerInfo.getTaskId()));
125       setData(partitionId, partitionData);
126     }
127 
128     return partitionData;
129   }
130 }