1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.comm;
1920import org.apache.giraph.bsp.CentralizedServiceWorker;
21import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22import org.apache.giraph.graph.Vertex;
23import org.apache.giraph.partition.PartitionOwner;
24import org.apache.giraph.utils.ExtendedDataOutput;
25import org.apache.giraph.utils.WritableUtils;
26import org.apache.hadoop.io.Writable;
27import org.apache.hadoop.io.WritableComparable;
28import org.apache.log4j.Logger;
2930import java.io.IOException;
3132importstatic org.apache.giraph.conf.GiraphConstants.ADDITIONAL_VERTEX_REQUEST_SIZE;
33importstatic org.apache.giraph.conf.GiraphConstants.MAX_VERTEX_REQUEST_SIZE;
3435/**36 * Caches partition vertices prior to sending. Aggregating these requests37 * will make larger, more efficient requests. Not thread-safe.38 *39 * @param <I> Vertex index value40 * @param <V> Vertex value41 * @param <E> Edge value42 */43publicclass SendPartitionCache<I extends WritableComparable,
44 V extends Writable, E extends Writable> extends45 SendDataCache<ExtendedDataOutput> {
46/** Class logger */47privatestaticfinal Logger LOG =
48 Logger.getLogger(SendPartitionCache.class);
4950/**51 * Constructor.52 *53 * @param conf Giraph configuration54 * @param serviceWorker Service worker55 */56publicSendPartitionCache(ImmutableClassesGiraphConfiguration<I, V, E> conf,
57 CentralizedServiceWorker<?, ?, ?> serviceWorker) {
58super(conf, serviceWorker, MAX_VERTEX_REQUEST_SIZE.get(conf),
59 ADDITIONAL_VERTEX_REQUEST_SIZE.get(conf));
60 }
6162/**63 * Add a vertex to the cache.64 *65 * @param partitionOwner Partition owner of the vertex66 * @param vertex Vertex to add67 * @return Size of partitions for this worker68 */69publicint addVertex(PartitionOwner partitionOwner,
70 Vertex<I, V, E> vertex) {
71// Get the data collection72ExtendedDataOutput partitionData =
73 getData(partitionOwner.getPartitionId());
74int taskId = partitionOwner.getWorkerInfo().getTaskId();
75int originalSize = 0;
76if (partitionData == null) {
77 partitionData = getConf().createExtendedDataOutput(
78 getInitialBufferSize(taskId));
79 setData(partitionOwner.getPartitionId(), partitionData);
80 } else {
81 originalSize = partitionData.getPos();
82 }
83try {
84 WritableUtils.<I, V, E>writeVertexToDataOutput(
85 partitionData, vertex, getConf());
86 } catch (IOException e) {
87thrownew IllegalStateException("addVertex: Failed to serialize", e);
88 }
8990// Update the size of cached, outgoing data per worker91return incrDataSize(taskId, partitionData.getPos() - originalSize);
92 }
93 }
94