1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.giraph.comm; 19 20 import org.apache.giraph.edge.Edge; 21 import org.apache.giraph.graph.Vertex; 22 import org.apache.giraph.partition.Partition; 23 import org.apache.giraph.partition.PartitionOwner; 24 import org.apache.giraph.worker.WorkerInfo; 25 import org.apache.hadoop.io.Writable; 26 import org.apache.hadoop.io.WritableComparable; 27 28 import java.io.IOException; 29 import java.util.Iterator; 30 31 /** 32 * Aggregates IPC requests and sends them off 33 * 34 * @param <I> Vertex index value 35 * @param <V> Vertex value 36 * @param <E> Edge value 37 */ 38 public interface WorkerClientRequestProcessor<I extends WritableComparable, 39 V extends Writable, E extends Writable> { 40 /** 41 * Sends a message to destination vertex. 42 * 43 * @param destVertexId Destination vertex id. 44 * @param message Message to send. 45 */ 46 void sendMessageRequest(I destVertexId, Writable message); 47 48 /** 49 * Sends a message through all edges to all destinations. 50 * 51 * @param vertex The source vertex. 52 * @param message Message to send. 53 */ 54 void sendMessageToAllRequest(Vertex<I, V, E> vertex, Writable message); 55 56 /** 57 * Sends a message to the targets in the iterator. 58 * 59 * @param vertexIdIterator The iterator of target vertex ids. 60 * @param message Message to send. 61 */ 62 void sendMessageToAllRequest(Iterator<I> vertexIdIterator, Writable message); 63 64 /** 65 * Sends a vertex to the appropriate partition owner 66 * 67 * @param partitionOwner Owner of the vertex 68 * @param vertex Vertex to send 69 * @return Returns true iff any network I/O occurred. 70 */ 71 boolean sendVertexRequest(PartitionOwner partitionOwner, 72 Vertex<I, V, E> vertex); 73 74 /** 75 * Send a partition request (no batching). 76 * 77 * @param workerInfo Worker to send the partition to 78 * @param partition Partition to send 79 */ 80 void sendPartitionRequest(WorkerInfo workerInfo, 81 Partition<I, V, E> partition); 82 83 /** 84 * Sends a request to the appropriate vertex range owner to add an edge 85 * 86 * @param vertexIndex Index of the vertex to get the request 87 * @param edge Edge to be added 88 * @throws java.io.IOException 89 */ 90 void addEdgeRequest(I vertexIndex, Edge<I, E> edge) throws IOException; 91 92 /** 93 * Sends a request to the source vertex owner to add an edge. 94 * Note: this request follows an optimized code path used by edge-based 95 * input, and doesn't coordinate with mutations. 96 * 97 * @param sourceVertexId Source vertex id. 98 * @param edge Edge to be added. 99 * @return Returns true iff any network I/O occurred. 100 * @throws IOException 101 */ 102 boolean sendEdgeRequest(I sourceVertexId, Edge<I, E> edge) 103 throws IOException; 104 105 /** 106 * Sends a request to the appropriate vertex range owner to remove all edges 107 * pointing to a given vertex. 108 * 109 * @param vertexIndex Index of the vertex to get the request 110 * @param destinationVertexIndex Index of the edge to be removed 111 * @throws IOException 112 */ 113 void removeEdgesRequest(I vertexIndex, I destinationVertexIndex) 114 throws IOException; 115 116 /** 117 * Sends a request to the appropriate vertex range owner to add a vertex 118 * 119 * @param vertex Vertex to be added 120 * @throws IOException 121 */ 122 void addVertexRequest(Vertex<I, V, E> vertex) throws IOException; 123 124 /** 125 * Sends a request to the appropriate vertex range owner to remove a vertex 126 * 127 * @param vertexIndex Index of the vertex to be removed 128 * @throws IOException 129 */ 130 void removeVertexRequest(I vertexIndex) throws IOException; 131 132 /** 133 * Flush all outgoing messages. This ensures that all the messages have been 134 * sent, but not guaranteed to have been delivered yet. 135 * 136 * @throws IOException 137 */ 138 void flush() throws IOException; 139 140 /** 141 * Get the messages sent during this superstep and clear them. 142 * 143 * @return Number of messages sent before the reset. 144 */ 145 long resetMessageCount(); 146 147 /** 148 * Get the message bytes sent during this superstep and clear them. 149 * 150 * @return Bytes of messages sent before the reset. 151 */ 152 long resetMessageBytesCount(); 153 }