View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.comm;
19  
20  import org.apache.giraph.edge.Edge;
21  import org.apache.giraph.graph.Vertex;
22  import org.apache.giraph.partition.Partition;
23  import org.apache.giraph.partition.PartitionOwner;
24  import org.apache.giraph.worker.WorkerInfo;
25  import org.apache.hadoop.io.Writable;
26  import org.apache.hadoop.io.WritableComparable;
27  
28  import java.io.IOException;
29  import java.util.Iterator;
30  
31  /**
32   * Aggregates IPC requests and sends them off
33   *
34   * @param <I> Vertex index value
35   * @param <V> Vertex value
36   * @param <E> Edge value
37   */
38  public interface WorkerClientRequestProcessor<I extends WritableComparable,
39      V extends Writable, E extends Writable> {
40    /**
41     * Sends a message to destination vertex.
42     *
43     * @param destVertexId Destination vertex id.
44     * @param message Message to send.
45     */
46    void sendMessageRequest(I destVertexId, Writable message);
47  
48    /**
49     * Sends a message through all edges to all destinations.
50     *
51     * @param vertex The source vertex.
52     * @param message  Message to send.
53     */
54    void sendMessageToAllRequest(Vertex<I, V, E> vertex, Writable message);
55  
56    /**
57     * Sends a message to the targets in the iterator.
58     *
59     * @param vertexIdIterator The iterator of target vertex ids.
60     * @param message  Message to send.
61     */
62    void sendMessageToAllRequest(Iterator<I> vertexIdIterator, Writable message);
63  
64    /**
65     * Sends a vertex to the appropriate partition owner
66     *
67     * @param partitionOwner Owner of the vertex
68     * @param vertex Vertex to send
69     * @return Returns true iff any network I/O occurred.
70     */
71    boolean sendVertexRequest(PartitionOwner partitionOwner,
72                              Vertex<I, V, E> vertex);
73  
74    /**
75     * Send a partition request (no batching).
76     *
77     * @param workerInfo Worker to send the partition to
78     * @param partition Partition to send
79     */
80    void sendPartitionRequest(WorkerInfo workerInfo,
81                              Partition<I, V, E> partition);
82  
83    /**
84     * Sends a request to the appropriate vertex range owner to add an edge
85     *
86     * @param vertexIndex Index of the vertex to get the request
87     * @param edge Edge to be added
88     * @throws java.io.IOException
89     */
90    void addEdgeRequest(I vertexIndex, Edge<I, E> edge) throws IOException;
91  
92    /**
93     * Sends a request to the source vertex owner to add an edge.
94     * Note: this request follows an optimized code path used by edge-based
95     * input, and doesn't coordinate with mutations.
96     *
97     * @param sourceVertexId Source vertex id.
98     * @param edge Edge to be added.
99     * @return Returns true iff any network I/O occurred.
100    * @throws IOException
101    */
102   boolean sendEdgeRequest(I sourceVertexId, Edge<I, E> edge)
103     throws IOException;
104 
105   /**
106    * Sends a request to the appropriate vertex range owner to remove all edges
107    * pointing to a given vertex.
108    *
109    * @param vertexIndex Index of the vertex to get the request
110    * @param destinationVertexIndex Index of the edge to be removed
111    * @throws IOException
112    */
113   void removeEdgesRequest(I vertexIndex, I destinationVertexIndex)
114     throws IOException;
115 
116   /**
117    * Sends a request to the appropriate vertex range owner to add a vertex
118    *
119    * @param vertex Vertex to be added
120    * @throws IOException
121    */
122   void addVertexRequest(Vertex<I, V, E> vertex) throws IOException;
123 
124   /**
125    * Sends a request to the appropriate vertex range owner to remove a vertex
126    *
127    * @param vertexIndex Index of the vertex to be removed
128    * @throws IOException
129    */
130   void removeVertexRequest(I vertexIndex) throws IOException;
131 
132   /**
133    * Flush all outgoing messages.  This ensures that all the messages have been
134    * sent, but not guaranteed to have been delivered yet.
135    *
136    * @throws IOException
137    */
138   void flush() throws IOException;
139 
140   /**
141    * Get the messages sent during this superstep and clear them.
142    *
143    * @return Number of messages sent before the reset.
144    */
145   long resetMessageCount();
146 
147   /**
148    * Get the message bytes sent during this superstep and clear them.
149    *
150    * @return Bytes of messages sent before the reset.
151    */
152   long resetMessageBytesCount();
153 }