1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.giraph.comm;
20
21 import org.apache.giraph.bsp.CentralizedServiceWorker;
22 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23 import org.apache.giraph.utils.VertexIdData;
24 import org.apache.giraph.worker.WorkerInfo;
25 import org.apache.hadoop.io.WritableComparable;
26
27 import javax.annotation.concurrent.NotThreadSafe;
28
29 /**
30 * An abstract structure for caching data indexed by vertex id,
31 * to be sent to workers in bulk. Not thread-safe.
32 *
33 * @param <I> Vertex id
34 * @param <T> Data
35 * @param <B> Specialization of {@link VertexIdData} for T
36 */
37 @NotThreadSafe
38 @SuppressWarnings("unchecked")
39 public abstract class SendVertexIdDataCache<I extends WritableComparable, T,
40 B extends VertexIdData<I, T>> extends SendDataCache<B> {
41 /**
42 * Constructor.
43 *
44 * @param conf Giraph configuration
45 * @param serviceWorker Service worker
46 * @param maxRequestSize Maximum request size (in bytes)
47 * @param additionalRequestSize Additional request size (expressed as a
48 * ratio of the average request size)
49 */
50 public SendVertexIdDataCache(ImmutableClassesGiraphConfiguration conf,
51 CentralizedServiceWorker<?, ?, ?> serviceWorker,
52 int maxRequestSize,
53 float additionalRequestSize) {
54 super(conf, serviceWorker, maxRequestSize, additionalRequestSize);
55 }
56
57 /**
58 * Create a new {@link VertexIdData} specialized for the use case.
59 *
60 * @return A new instance of {@link VertexIdData}
61 */
62 public abstract B createVertexIdData();
63
64 /**
65 * Add data to the cache.
66 *
67 * @param workerInfo the remote worker destination
68 * @param partitionId the remote Partition this message belongs to
69 * @param destVertexId vertex id that is ultimate destination
70 * @param data Data to send to remote worker
71 * @return Size of messages for the worker.
72 */
73 public int addData(WorkerInfo workerInfo,
74 int partitionId, I destVertexId, T data) {
75 // Get the data collection
76 VertexIdData<I, T> partitionData =
77 getPartitionData(workerInfo, partitionId);
78 int originalSize = partitionData.getSize();
79 partitionData.add(destVertexId, data);
80 // Update the size of cached, outgoing data per worker
81 return incrDataSize(workerInfo.getTaskId(),
82 partitionData.getSize() - originalSize);
83 }
84
85 /**
86 * This method is similar to the method above,
87 * but use a serialized id to replace original I type
88 * destVertexId.
89 *
90 * @param workerInfo The remote worker destination
91 * @param partitionId The remote Partition this message belongs to
92 * @param serializedId The byte array to store the serialized target vertex id
93 * @param idPos The length of bytes of serialized id in the byte array above
94 * @param data Data to send to remote worker
95 * @return The number of bytes added to the target worker
96 */
97 public int addData(WorkerInfo workerInfo, int partitionId,
98 byte[] serializedId, int idPos, T data) {
99 // Get the data collection
100 VertexIdData<I, T> partitionData =
101 getPartitionData(workerInfo, partitionId);
102 int originalSize = partitionData.getSize();
103 partitionData.add(serializedId, idPos, data);
104 // Update the size of cached, outgoing data per worker
105 return incrDataSize(workerInfo.getTaskId(),
106 partitionData.getSize() - originalSize);
107 }
108
109 /**
110 * This method tries to get a partition data from the data cache.
111 * If null, it will create one.
112 *
113 * @param workerInfo The remote worker destination
114 * @param partitionId The remote Partition this message belongs to
115 * @return The partition data in data cache
116 */
117 private VertexIdData<I, T> getPartitionData(WorkerInfo workerInfo,
118 int partitionId) {
119 // Get the data collection
120 B partitionData = getData(partitionId);
121 if (partitionData == null) {
122 partitionData = createVertexIdData();
123 partitionData.setConf(getConf());
124 partitionData.initialize(getInitialBufferSize(workerInfo.getTaskId()));
125 setData(partitionId, partitionData);
126 }
127
128 return partitionData;
129 }
130 }