1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.giraph.comm;
20
21 import org.apache.giraph.bsp.CentralizedServiceWorker;
22 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23 import org.apache.giraph.edge.Edge;
24 import org.apache.giraph.utils.ByteArrayVertexIdEdges;
25 import org.apache.giraph.utils.PairList;
26 import org.apache.giraph.utils.VertexIdEdges;
27 import org.apache.giraph.worker.WorkerInfo;
28 import org.apache.hadoop.io.Writable;
29 import org.apache.hadoop.io.WritableComparable;
30
31 import static org.apache.giraph.conf.GiraphConstants.ADDITIONAL_EDGE_REQUEST_SIZE;
32 import static org.apache.giraph.conf.GiraphConstants.MAX_EDGE_REQUEST_SIZE;
33
34 /**
35 * Aggregates the edges to be sent to workers so they can be sent
36 * in bulk. Not thread-safe.
37 *
38 * @param <I> Vertex id
39 * @param <E> Edge value
40 */
41 public class SendEdgeCache<I extends WritableComparable, E extends Writable>
42 extends SendVertexIdDataCache<I, Edge<I, E>, VertexIdEdges<I, E>> {
43 /**
44 * Constructor
45 *
46 * @param conf Giraph configuration
47 * @param serviceWorker Service worker
48 */
49 public SendEdgeCache(ImmutableClassesGiraphConfiguration conf,
50 CentralizedServiceWorker<?, ?, ?> serviceWorker) {
51 super(conf, serviceWorker, MAX_EDGE_REQUEST_SIZE.get(conf),
52 ADDITIONAL_EDGE_REQUEST_SIZE.get(conf));
53 }
54
55 @Override
56 public VertexIdEdges<I, E> createVertexIdData() {
57 return new ByteArrayVertexIdEdges<I, E>();
58 }
59
60 /**
61 * Add an edge to the cache.
62 *
63 * @param workerInfo the remote worker destination
64 * @param partitionId the remote Partition this edge belongs to
65 * @param destVertexId vertex id that is ultimate destination
66 * @param edge Edge to send to remote worker
67 * @return Size of edges for the worker.
68 */
69 public int addEdge(WorkerInfo workerInfo,
70 int partitionId, I destVertexId, Edge<I, E> edge) {
71 return addData(workerInfo, partitionId, destVertexId, edge);
72 }
73
74 /**
75 * Gets the edges for a worker and removes it from the cache.
76 *
77 * @param workerInfo the address of the worker who owns the data
78 * partitions that are receiving the edges
79 * @return List of pairs (partitionId, ByteArrayVertexIdEdges),
80 * where all partition ids belong to workerInfo
81 */
82 public PairList<Integer, VertexIdEdges<I, E>>
83 removeWorkerEdges(WorkerInfo workerInfo) {
84 return removeWorkerData(workerInfo);
85 }
86
87 /**
88 * Gets all the edges and removes them from the cache.
89 *
90 * @return All vertex edges for all partitions
91 */
92 public PairList<WorkerInfo, PairList<Integer, VertexIdEdges<I, E>>>
93 removeAllEdges() {
94 return removeAllData();
95 }
96 }