1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.giraph.comm;
19
20 import org.apache.giraph.edge.Edge;
21 import org.apache.giraph.graph.Vertex;
22 import org.apache.giraph.partition.Partition;
23 import org.apache.giraph.partition.PartitionOwner;
24 import org.apache.giraph.worker.WorkerInfo;
25 import org.apache.hadoop.io.Writable;
26 import org.apache.hadoop.io.WritableComparable;
27
28 import java.io.IOException;
29 import java.util.Iterator;
30
31 /**
32 * Aggregates IPC requests and sends them off
33 *
34 * @param <I> Vertex index value
35 * @param <V> Vertex value
36 * @param <E> Edge value
37 */
38 public interface WorkerClientRequestProcessor<I extends WritableComparable,
39 V extends Writable, E extends Writable> {
40 /**
41 * Sends a message to destination vertex.
42 *
43 * @param destVertexId Destination vertex id.
44 * @param message Message to send.
45 */
46 void sendMessageRequest(I destVertexId, Writable message);
47
48 /**
49 * Sends a message through all edges to all destinations.
50 *
51 * @param vertex The source vertex.
52 * @param message Message to send.
53 */
54 void sendMessageToAllRequest(Vertex<I, V, E> vertex, Writable message);
55
56 /**
57 * Sends a message to the targets in the iterator.
58 *
59 * @param vertexIdIterator The iterator of target vertex ids.
60 * @param message Message to send.
61 */
62 void sendMessageToAllRequest(Iterator<I> vertexIdIterator, Writable message);
63
64 /**
65 * Sends a vertex to the appropriate partition owner
66 *
67 * @param partitionOwner Owner of the vertex
68 * @param vertex Vertex to send
69 * @return Returns true iff any network I/O occurred.
70 */
71 boolean sendVertexRequest(PartitionOwner partitionOwner,
72 Vertex<I, V, E> vertex);
73
74 /**
75 * Send a partition request (no batching).
76 *
77 * @param workerInfo Worker to send the partition to
78 * @param partition Partition to send
79 */
80 void sendPartitionRequest(WorkerInfo workerInfo,
81 Partition<I, V, E> partition);
82
83 /**
84 * Sends a request to the appropriate vertex range owner to add an edge
85 *
86 * @param vertexIndex Index of the vertex to get the request
87 * @param edge Edge to be added
88 * @throws java.io.IOException
89 */
90 void addEdgeRequest(I vertexIndex, Edge<I, E> edge) throws IOException;
91
92 /**
93 * Sends a request to the source vertex owner to add an edge.
94 * Note: this request follows an optimized code path used by edge-based
95 * input, and doesn't coordinate with mutations.
96 *
97 * @param sourceVertexId Source vertex id.
98 * @param edge Edge to be added.
99 * @return Returns true iff any network I/O occurred.
100 * @throws IOException
101 */
102 boolean sendEdgeRequest(I sourceVertexId, Edge<I, E> edge)
103 throws IOException;
104
105 /**
106 * Sends a request to the appropriate vertex range owner to remove all edges
107 * pointing to a given vertex.
108 *
109 * @param vertexIndex Index of the vertex to get the request
110 * @param destinationVertexIndex Index of the edge to be removed
111 * @throws IOException
112 */
113 void removeEdgesRequest(I vertexIndex, I destinationVertexIndex)
114 throws IOException;
115
116 /**
117 * Sends a request to the appropriate vertex range owner to add a vertex
118 *
119 * @param vertex Vertex to be added
120 * @throws IOException
121 */
122 void addVertexRequest(Vertex<I, V, E> vertex) throws IOException;
123
124 /**
125 * Sends a request to the appropriate vertex range owner to remove a vertex
126 *
127 * @param vertexIndex Index of the vertex to be removed
128 * @throws IOException
129 */
130 void removeVertexRequest(I vertexIndex) throws IOException;
131
132 /**
133 * Flush all outgoing messages. This ensures that all the messages have been
134 * sent, but not guaranteed to have been delivered yet.
135 *
136 * @throws IOException
137 */
138 void flush() throws IOException;
139
140 /**
141 * Get the messages sent during this superstep and clear them.
142 *
143 * @return Number of messages sent before the reset.
144 */
145 long resetMessageCount();
146
147 /**
148 * Get the message bytes sent during this superstep and clear them.
149 *
150 * @return Bytes of messages sent before the reset.
151 */
152 long resetMessageBytesCount();
153 }