1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm;
2021import com.google.common.collect.Lists;
22import com.google.common.collect.Maps;
23import org.apache.giraph.bsp.CentralizedServiceWorker;
24import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
25import org.apache.giraph.partition.PartitionOwner;
26import org.apache.giraph.utils.PairList;
27import org.apache.giraph.worker.WorkerInfo;
2829import javax.annotation.concurrent.NotThreadSafe;
30import java.util.List;
31import java.util.Map;
3233/**34 * An abstract structure for caching data by partitions35 * to be sent to workers in bulk. Not thread-safe.36 *37 * @param <D> Data type of partition cache38 */39 @NotThreadSafe
40 @SuppressWarnings("unchecked")
41publicabstractclass SendDataCache<D> {
42/**43 * Internal cache of partitions (index) to their partition caches of44 * type D.45 */46privatefinal D[] dataCache;
47/** How big to initially make output streams for each worker's partitions */48privatefinalint[] initialBufferSizes;
49/** Service worker */50privatefinalCentralizedServiceWorker serviceWorker;
51/** Size of data (in bytes) for each worker */52privatefinalint[] dataSizes;
53/** Total number of workers */54privatefinalint numWorkers;
55/** List of partition ids belonging to a worker */56privatefinal Map<WorkerInfo, List<Integer>> workerPartitions =
57 Maps.newHashMap();
58/** Giraph configuration */59privatefinalImmutableClassesGiraphConfiguration conf;
6061/**62 * Constructor.63 *64 * @param conf Giraph configuration65 * @param serviceWorker Service worker66 * @param maxRequestSize Maximum request size (in bytes)67 * @param additionalRequestSize Additional request size (expressed as a68 * ratio of the average request size)69 */70publicSendDataCache(ImmutableClassesGiraphConfiguration conf,
71 CentralizedServiceWorker<?, ?, ?> serviceWorker,
72int maxRequestSize,
73float additionalRequestSize) {
74this.conf = conf;
75this.serviceWorker = serviceWorker;
76int maxPartition = 0;
77for (PartitionOwner partitionOwner : serviceWorker.getPartitionOwners()) {
78 List<Integer> workerPartitionIds =
79 workerPartitions.get(partitionOwner.getWorkerInfo());
80if (workerPartitionIds == null) {
81 workerPartitionIds = Lists.newArrayList();
82 workerPartitions.put(partitionOwner.getWorkerInfo(),
83 workerPartitionIds);
84 }
85 workerPartitionIds.add(partitionOwner.getPartitionId());
86 maxPartition = Math.max(partitionOwner.getPartitionId(), maxPartition);
87 }
88 dataCache = (D[]) new Object[maxPartition + 1];
8990int maxWorker = 0;
91for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
92 maxWorker = Math.max(maxWorker, workerInfo.getTaskId());
93 }
94 dataSizes = newint[maxWorker + 1];
9596int initialRequestSize =
97 (int) (maxRequestSize * (1 + additionalRequestSize));
98 initialBufferSizes = newint[maxWorker + 1];
99for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
100 initialBufferSizes[workerInfo.getTaskId()] =
101 initialRequestSize / workerPartitions.get(workerInfo).size();
102 }
103 numWorkers = maxWorker + 1;
104 }
105106/**107 * Gets the data for a worker and removes it from the cache.108 *109 * @param workerInfo the address of the worker who owns the data110 * partitions that are receiving the data111 * @return List of pairs (partitionId, ByteArrayVertexIdData),112 * where all partition ids belong to workerInfo113 */114public PairList<Integer, D>
115 removeWorkerData(WorkerInfo workerInfo) {
116 PairList<Integer, D> workerData = new PairList<Integer, D>();
117 List<Integer> partitions = workerPartitions.get(workerInfo);
118 workerData.initialize(partitions.size());
119for (Integer partitionId : partitions) {
120if (dataCache[partitionId] != null) {
121 workerData.add(partitionId, (D) dataCache[partitionId]);
122 dataCache[partitionId] = null;
123 }
124 }
125 dataSizes[workerInfo.getTaskId()] = 0;
126return workerData;
127 }
128129/**130 * Gets all the data and removes it from the cache.131 *132 * @return All data for all vertices for all partitions133 */134public PairList<WorkerInfo, PairList<Integer, D>> removeAllData() {
135 PairList<WorkerInfo, PairList<Integer, D>> allData =
136new PairList<WorkerInfo, PairList<Integer, D>>();
137 allData.initialize(dataSizes.length);
138for (WorkerInfo workerInfo : workerPartitions.keySet()) {
139 PairList<Integer, D> workerData = removeWorkerData(workerInfo);
140if (!workerData.isEmpty()) {
141 allData.add(workerInfo, workerData);
142 }
143 dataSizes[workerInfo.getTaskId()] = 0;
144 }
145return allData;
146 }
147148/**149 * Get the data cache for a partition id150 *151 * @param partitionId Partition id152 * @return Data cache for a partition153 */154public D getData(int partitionId) {
155return dataCache[partitionId];
156 }
157158/**159 * Set the data cache for a partition id160 *161 * @param partitionId Partition id162 * @param data Data to be set for a partition id163 */164publicvoid setData(int partitionId, D data) {
165 dataCache[partitionId] = data;
166 }
167168/**169 * Get initial buffer size of a partition.170 *171 * @param partitionId Partition id172 * @return Initial buffer size of a partition173 */174publicint getInitialBufferSize(int partitionId) {
175return initialBufferSizes[partitionId];
176 }
177178/**179 * Increment the data size180 *181 * @param partitionId Partition id182 * @param size Size to increment by183 * @return new data size184 */185publicint incrDataSize(int partitionId, int size) {
186 dataSizes[partitionId] += size;
187return dataSizes[partitionId];
188 }
189190publicImmutableClassesGiraphConfiguration getConf() {
191return conf;
192 }
193194/**195 * Get the service worker.196 *197 * @return CentralizedServiceWorker198 */199protectedCentralizedServiceWorker getServiceWorker() {
200return serviceWorker;
201 }
202203/**204 * Get the initial buffer size for the messages sent to a worker.205 *206 * @param taskId The task ID of a worker.207 * @return The initial buffer size for a worker.208 */209protectedint getSendWorkerInitialBufferSize(int taskId) {
210return initialBufferSizes[taskId];
211 }
212213protectedint getNumWorkers() {
214returnthis.numWorkers;
215 }
216217protected Map<WorkerInfo, List<Integer>> getWorkerPartitions() {
218return workerPartitions;
219 }
220 }