This project has retired. For details please refer to its Attic page.
SendDataCache xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm;
20  
21  import com.google.common.collect.Lists;
22  import com.google.common.collect.Maps;
23  import org.apache.giraph.bsp.CentralizedServiceWorker;
24  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
25  import org.apache.giraph.partition.PartitionOwner;
26  import org.apache.giraph.utils.PairList;
27  import org.apache.giraph.worker.WorkerInfo;
28  
29  import javax.annotation.concurrent.NotThreadSafe;
30  import java.util.List;
31  import java.util.Map;
32  
33  /**
34   * An abstract structure for caching data by partitions
35   * to be sent to workers in bulk. Not thread-safe.
36   *
37   * @param <D> Data type of partition cache
38   */
39  @NotThreadSafe
40  @SuppressWarnings("unchecked")
41  public abstract class SendDataCache<D> {
42    /**
43     * Internal cache of partitions (index) to their partition caches of
44     * type D.
45     */
46    private final D[] dataCache;
47    /** How big to initially make output streams for each worker's partitions */
48    private final int[] initialBufferSizes;
49    /** Service worker */
50    private final CentralizedServiceWorker serviceWorker;
51    /** Size of data (in bytes) for each worker */
52    private final int[] dataSizes;
53    /** Total number of workers */
54    private final int numWorkers;
55    /** List of partition ids belonging to a worker */
56    private final Map<WorkerInfo, List<Integer>> workerPartitions =
57        Maps.newHashMap();
58    /** Giraph configuration */
59    private final ImmutableClassesGiraphConfiguration conf;
60  
61    /**
62     * Constructor.
63     *
64     * @param conf Giraph configuration
65     * @param serviceWorker Service worker
66     * @param maxRequestSize Maximum request size (in bytes)
67     * @param additionalRequestSize Additional request size (expressed as a
68     *                              ratio of the average request size)
69     */
70    public SendDataCache(ImmutableClassesGiraphConfiguration conf,
71                         CentralizedServiceWorker<?, ?, ?> serviceWorker,
72                         int maxRequestSize,
73                         float additionalRequestSize) {
74      this.conf = conf;
75      this.serviceWorker = serviceWorker;
76      int maxPartition = 0;
77      for (PartitionOwner partitionOwner : serviceWorker.getPartitionOwners()) {
78        List<Integer> workerPartitionIds =
79            workerPartitions.get(partitionOwner.getWorkerInfo());
80        if (workerPartitionIds == null) {
81          workerPartitionIds = Lists.newArrayList();
82          workerPartitions.put(partitionOwner.getWorkerInfo(),
83              workerPartitionIds);
84        }
85        workerPartitionIds.add(partitionOwner.getPartitionId());
86        maxPartition = Math.max(partitionOwner.getPartitionId(), maxPartition);
87      }
88      dataCache = (D[]) new Object[maxPartition + 1];
89  
90      int maxWorker = 0;
91      for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
92        maxWorker = Math.max(maxWorker, workerInfo.getTaskId());
93      }
94      dataSizes = new int[maxWorker + 1];
95  
96      int initialRequestSize =
97          (int) (maxRequestSize * (1 + additionalRequestSize));
98      initialBufferSizes = new int[maxWorker + 1];
99      for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
100       initialBufferSizes[workerInfo.getTaskId()] =
101           initialRequestSize / workerPartitions.get(workerInfo).size();
102     }
103     numWorkers = maxWorker + 1;
104   }
105 
106   /**
107    * Gets the data for a worker and removes it from the cache.
108    *
109    * @param workerInfo the address of the worker who owns the data
110    *                   partitions that are receiving the data
111    * @return List of pairs (partitionId, ByteArrayVertexIdData),
112    *         where all partition ids belong to workerInfo
113    */
114   public PairList<Integer, D>
115   removeWorkerData(WorkerInfo workerInfo) {
116     PairList<Integer, D> workerData = new PairList<Integer, D>();
117     List<Integer> partitions = workerPartitions.get(workerInfo);
118     workerData.initialize(partitions.size());
119     for (Integer partitionId : partitions) {
120       if (dataCache[partitionId] != null) {
121         workerData.add(partitionId, (D) dataCache[partitionId]);
122         dataCache[partitionId] = null;
123       }
124     }
125     dataSizes[workerInfo.getTaskId()] = 0;
126     return workerData;
127   }
128 
129   /**
130    * Gets all the data and removes it from the cache.
131    *
132    * @return All data for all vertices for all partitions
133    */
134   public PairList<WorkerInfo, PairList<Integer, D>> removeAllData() {
135     PairList<WorkerInfo, PairList<Integer, D>> allData =
136         new PairList<WorkerInfo, PairList<Integer, D>>();
137     allData.initialize(dataSizes.length);
138     for (WorkerInfo workerInfo : workerPartitions.keySet()) {
139       PairList<Integer, D> workerData = removeWorkerData(workerInfo);
140       if (!workerData.isEmpty()) {
141         allData.add(workerInfo, workerData);
142       }
143       dataSizes[workerInfo.getTaskId()] = 0;
144     }
145     return allData;
146   }
147 
148   /**
149    * Get the data cache for a partition id
150    *
151    * @param partitionId Partition id
152    * @return Data cache for a partition
153    */
154   public D getData(int partitionId) {
155     return dataCache[partitionId];
156   }
157 
158   /**
159    * Set the data cache for a partition id
160    *
161    * @param partitionId Partition id
162    * @param data Data to be set for a partition id
163    */
164   public void setData(int partitionId, D data) {
165     dataCache[partitionId] = data;
166   }
167 
168   /**
169    * Get initial buffer size of a partition.
170    *
171    * @param partitionId Partition id
172    * @return Initial buffer size of a partition
173    */
174   public int getInitialBufferSize(int partitionId) {
175     return initialBufferSizes[partitionId];
176   }
177 
178   /**
179    * Increment the data size
180    *
181    * @param partitionId Partition id
182    * @param size Size to increment by
183    * @return new data size
184    */
185   public int incrDataSize(int partitionId, int size) {
186     dataSizes[partitionId] += size;
187     return dataSizes[partitionId];
188   }
189 
190   public ImmutableClassesGiraphConfiguration getConf() {
191     return conf;
192   }
193 
194   /**
195    * Get the service worker.
196    *
197    * @return CentralizedServiceWorker
198    */
199   protected CentralizedServiceWorker getServiceWorker() {
200     return serviceWorker;
201   }
202 
203   /**
204    * Get the initial buffer size for the messages sent to a worker.
205    *
206    * @param taskId The task ID of a worker.
207    * @return The initial buffer size for a worker.
208    */
209   protected int getSendWorkerInitialBufferSize(int taskId) {
210     return initialBufferSizes[taskId];
211   }
212 
213   protected int getNumWorkers() {
214     return this.numWorkers;
215   }
216 
217   protected Map<WorkerInfo, List<Integer>> getWorkerPartitions() {
218     return workerPartitions;
219   }
220 }