1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.giraph.jython;
19
20 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
21 import org.apache.giraph.edge.Edge;
22 import org.apache.giraph.edge.OutEdges;
23 import org.apache.giraph.graph.GraphType;
24 import org.apache.giraph.graph.Vertex;
25 import org.apache.giraph.worker.WorkerContext;
26 import org.apache.hadoop.io.Writable;
27 import org.apache.hadoop.io.WritableComparable;
28 import org.apache.hadoop.mapreduce.Mapper;
29
30 import java.io.IOException;
31 import java.util.Iterator;
32
33 /**
34 * Base class for writing computations in Jython.
35 *
36 * Note that this class DOES NOT implement
37 * {@link org.apache.giraph.graph.Computation}.
38 * This is because we want to support passing in pure Jython types,
39 * and implementing the {@link org.apache.giraph.graph.Computation}
40 * requires passing in {@link Writable}s.
41 * Calling such methods from Jython would throw errors. So, instead,
42 * we have recreated the methods with the same name here. In each method
43 * we check if the type is a pure Jython value, and if so wrap it in
44 * the necessary
45 * {@link org.apache.giraph.jython.wrappers.JythonWritableWrapper}.
46 *
47 * This class works together with {@link JythonGiraphComputation} which takes
48 * care of the {@link org.apache.giraph.graph.Computation}
49 * Giraph infrastructure side of things.
50 */
51 public abstract class JythonComputation extends
52 DefaultImmutableClassesGiraphConfigurable {
53 /** The computation to callback to */
54 private JythonGiraphComputation giraphCompute;
55
56 public void setGiraphCompute(JythonGiraphComputation giraphCompute) {
57 this.giraphCompute = giraphCompute;
58 }
59
60 /**
61 * User's computation function
62 *
63 * @param vertex the Vertex to compute on
64 * @param messages iterable of messages
65 */
66 public abstract void compute(Object vertex, Iterable messages);
67
68 /**
69 * Prepare for computation. This method is executed exactly once prior to
70 * {@link #compute(Object, Iterable)} being called for any of the vertices
71 * in the partition.
72 */
73 public void preSuperstep() { }
74
75 /**
76 * Finish computation. This method is executed exactly once after computation
77 * for all vertices in the partition is complete.
78 */
79 public void postSuperstep() { }
80
81 /**
82 * Retrieves the current superstep.
83 *
84 * @return Current superstep
85 */
86 public long getSuperstep() {
87 return giraphCompute.getSuperstep();
88 }
89
90 /**
91 * Get the total (all workers) number of vertices that
92 * existed in the previous superstep.
93 *
94 * @return Total number of vertices (-1 if first superstep)
95 */
96 public long getTotalNumVertices() {
97 return giraphCompute.getTotalNumVertices();
98 }
99
100 /**
101 * Get the total (all workers) number of edges that
102 * existed in the previous superstep.
103 *
104 * @return Total number of edges (-1 if first superstep)
105 */
106 public long getTotalNumEdges() {
107 return giraphCompute.getTotalNumEdges();
108 }
109
110 /**
111 * Send a message to a vertex id.
112 *
113 * @param id Vertex id to send the message to
114 * @param message Message data to send
115 */
116 public void sendMessage(Object id, Object message) {
117 WritableComparable wrappedId = giraphCompute.wrapIdIfNecessary(id);
118 Writable wrappedMessage = giraphCompute.wrapIfNecessary(message,
119 GraphType.OUTGOING_MESSAGE_VALUE);
120 giraphCompute.sendMessage(wrappedId, wrappedMessage);
121 }
122
123 /**
124 * Send a message to all edges.
125 *
126 * @param vertex Vertex whose edges to send the message to.
127 * @param message Message sent to all edges.
128 */
129 public void sendMessageToAllEdges(Vertex vertex, Object message) {
130 Writable wrappedMessage = giraphCompute.wrapIfNecessary(message,
131 GraphType.OUTGOING_MESSAGE_VALUE);
132 giraphCompute.sendMessageToAllEdges(vertex, wrappedMessage);
133 }
134
135 /**
136 * Send a message to multiple target vertex ids in the iterator.
137 *
138 * @param vertexIdIterator An iterator to multiple target vertex ids.
139 * @param message Message sent to all targets in the iterator.
140 */
141 public void sendMessageToMultipleEdges(Iterator vertexIdIterator,
142 Object message) {
143 Writable wrappedMessage = giraphCompute.wrapIfNecessary(message,
144 GraphType.OUTGOING_MESSAGE_VALUE);
145 giraphCompute.sendMessageToMultipleEdges(vertexIdIterator, wrappedMessage);
146 }
147
148 /**
149 * Sends a request to create a vertex that will be available during the
150 * next superstep.
151 *
152 * @param id Vertex id
153 * @param vertexValue Vertex value
154 * @param edges Initial edges
155 */
156 public void addVertexRequest(Object id, Object vertexValue,
157 OutEdges edges) throws IOException {
158 WritableComparable wrappedId = giraphCompute.wrapIdIfNecessary(id);
159 Writable wrappedValue = giraphCompute.wrapIfNecessary(vertexValue,
160 GraphType.VERTEX_VALUE);
161 giraphCompute.addVertexRequest(wrappedId, wrappedValue, edges);
162 }
163
164 /**
165 * Sends a request to create a vertex that will be available during the
166 * next superstep.
167 *
168 * @param id Vertex id
169 * @param vertexValue Vertex value
170 */
171 public void addVertexRequest(Object id, Object vertexValue)
172 throws IOException {
173 WritableComparable wrappedId = giraphCompute.wrapIdIfNecessary(id);
174 Writable wrappedVertexValue = giraphCompute.wrapIfNecessary(vertexValue,
175 GraphType.VERTEX_VALUE);
176 giraphCompute.addVertexRequest(wrappedId, wrappedVertexValue);
177 }
178
179 /**
180 * Request to remove a vertex from the graph
181 * (applied just prior to the next superstep).
182 *
183 * @param id Id of the vertex to be removed.
184 */
185 public void removeVertexRequest(Object id) throws IOException {
186 WritableComparable wrappedId = giraphCompute.wrapIdIfNecessary(id);
187 giraphCompute.removeVertexRequest(wrappedId);
188 }
189
190 /**
191 * Request to add an edge of a vertex in the graph
192 * (processed just prior to the next superstep)
193 *
194 * @param sourceVertexId Source vertex id of edge
195 * @param edge Edge to add
196 */
197 public void addEdgeRequest(Object sourceVertexId, Edge edge)
198 throws IOException {
199 WritableComparable wrappedSourceId =
200 giraphCompute.wrapIdIfNecessary(sourceVertexId);
201 giraphCompute.addEdgeRequest(wrappedSourceId, edge);
202 }
203
204 /**
205 * Request to remove all edges from a given source vertex to a given target
206 * vertex (processed just prior to the next superstep).
207 *
208 * @param sourceVertexId Source vertex id
209 * @param targetVertexId Target vertex id
210 */
211 public void removeEdgesRequest(Object sourceVertexId, Object targetVertexId)
212 throws IOException {
213 WritableComparable wrappedSourceVertexId =
214 giraphCompute.wrapIdIfNecessary(sourceVertexId);
215 WritableComparable wrappedTargetVertexId =
216 giraphCompute.wrapIdIfNecessary(targetVertexId);
217 giraphCompute.removeEdgesRequest(wrappedSourceVertexId,
218 wrappedTargetVertexId);
219 }
220
221 /**
222 * Get the mapper context
223 *
224 * @return Mapper context
225 */
226 public Mapper.Context getContext() {
227 return giraphCompute.getContext();
228 }
229
230 /**
231 * Get the worker context
232 *
233 * @param <W> WorkerContext class
234 * @return WorkerContext context
235 */
236 @SuppressWarnings("unchecked")
237 public <W extends WorkerContext> W getWorkerContext() {
238 return (W) giraphCompute.getWorkerContext();
239 }
240 }
241