1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm;
2021import org.apache.giraph.bsp.CentralizedServiceWorker;
22import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23import org.apache.giraph.edge.Edge;
24import org.apache.giraph.utils.ByteArrayVertexIdEdges;
25import org.apache.giraph.utils.PairList;
26import org.apache.giraph.utils.VertexIdEdges;
27import org.apache.giraph.worker.WorkerInfo;
28import org.apache.hadoop.io.Writable;
29import org.apache.hadoop.io.WritableComparable;
3031importstatic org.apache.giraph.conf.GiraphConstants.ADDITIONAL_EDGE_REQUEST_SIZE;
32importstatic org.apache.giraph.conf.GiraphConstants.MAX_EDGE_REQUEST_SIZE;
3334/**35 * Aggregates the edges to be sent to workers so they can be sent36 * in bulk. Not thread-safe.37 *38 * @param <I> Vertex id39 * @param <E> Edge value40 */41publicclass SendEdgeCache<I extends WritableComparable, E extends Writable>
42extends SendVertexIdDataCache<I, Edge<I, E>, VertexIdEdges<I, E>> {
43/**44 * Constructor45 *46 * @param conf Giraph configuration47 * @param serviceWorker Service worker48 */49publicSendEdgeCache(ImmutableClassesGiraphConfiguration conf,
50 CentralizedServiceWorker<?, ?, ?> serviceWorker) {
51super(conf, serviceWorker, MAX_EDGE_REQUEST_SIZE.get(conf),
52 ADDITIONAL_EDGE_REQUEST_SIZE.get(conf));
53 }
5455 @Override
56public VertexIdEdges<I, E> createVertexIdData() {
57returnnew ByteArrayVertexIdEdges<I, E>();
58 }
5960/**61 * Add an edge to the cache.62 *63 * @param workerInfo the remote worker destination64 * @param partitionId the remote Partition this edge belongs to65 * @param destVertexId vertex id that is ultimate destination66 * @param edge Edge to send to remote worker67 * @return Size of edges for the worker.68 */69publicint addEdge(WorkerInfo workerInfo,
70int partitionId, I destVertexId, Edge<I, E> edge) {
71return addData(workerInfo, partitionId, destVertexId, edge);
72 }
7374/**75 * Gets the edges for a worker and removes it from the cache.76 *77 * @param workerInfo the address of the worker who owns the data78 * partitions that are receiving the edges79 * @return List of pairs (partitionId, ByteArrayVertexIdEdges),80 * where all partition ids belong to workerInfo81 */82public PairList<Integer, VertexIdEdges<I, E>>
83 removeWorkerEdges(WorkerInfo workerInfo) {
84return removeWorkerData(workerInfo);
85 }
8687/**88 * Gets all the edges and removes them from the cache.89 *90 * @return All vertex edges for all partitions91 */92public PairList<WorkerInfo, PairList<Integer, VertexIdEdges<I, E>>>
93 removeAllEdges() {
94return removeAllData();
95 }
96 }