This project has retired. For details please refer to its Attic page.
JythonComputation xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.jython;
19  
20  import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
21  import org.apache.giraph.edge.Edge;
22  import org.apache.giraph.edge.OutEdges;
23  import org.apache.giraph.graph.GraphType;
24  import org.apache.giraph.graph.Vertex;
25  import org.apache.giraph.worker.WorkerContext;
26  import org.apache.hadoop.io.Writable;
27  import org.apache.hadoop.io.WritableComparable;
28  import org.apache.hadoop.mapreduce.Mapper;
29  
30  import java.io.IOException;
31  import java.util.Iterator;
32  
33  /**
34   * Base class for writing computations in Jython.
35   *
36   * Note that this class DOES NOT implement
37   * {@link org.apache.giraph.graph.Computation}.
38   * This is because we want to support passing in pure Jython types,
39   * and implementing the {@link org.apache.giraph.graph.Computation}
40   * requires passing in {@link Writable}s.
41   * Calling such methods from Jython would throw errors. So, instead,
42   * we have recreated the methods with the same name here. In each method
43   * we check if the type is a pure Jython value, and if so wrap it in
44   * the necessary
45   * {@link org.apache.giraph.jython.wrappers.JythonWritableWrapper}.
46   *
47   * This class works together with {@link JythonGiraphComputation} which takes
48   * care of the {@link org.apache.giraph.graph.Computation}
49   * Giraph infrastructure side of things.
50   */
51  public abstract class JythonComputation extends
52      DefaultImmutableClassesGiraphConfigurable {
53    /** The computation to callback to */
54    private JythonGiraphComputation giraphCompute;
55  
56    public void setGiraphCompute(JythonGiraphComputation giraphCompute) {
57      this.giraphCompute = giraphCompute;
58    }
59  
60    /**
61     * User's computation function
62     *
63     * @param vertex the Vertex to compute on
64     * @param messages iterable of messages
65     */
66    public abstract void compute(Object vertex, Iterable messages);
67  
68    /**
69     * Prepare for computation. This method is executed exactly once prior to
70     * {@link #compute(Object, Iterable)} being called for any of the vertices
71     * in the partition.
72     */
73    public void preSuperstep() { }
74  
75    /**
76     * Finish computation. This method is executed exactly once after computation
77     * for all vertices in the partition is complete.
78     */
79    public void postSuperstep() { }
80  
81    /**
82     * Retrieves the current superstep.
83     *
84     * @return Current superstep
85     */
86    public long getSuperstep() {
87      return giraphCompute.getSuperstep();
88    }
89  
90    /**
91     * Get the total (all workers) number of vertices that
92     * existed in the previous superstep.
93     *
94     * @return Total number of vertices (-1 if first superstep)
95     */
96    public long getTotalNumVertices() {
97      return giraphCompute.getTotalNumVertices();
98    }
99  
100   /**
101    * Get the total (all workers) number of edges that
102    * existed in the previous superstep.
103    *
104    * @return Total number of edges (-1 if first superstep)
105    */
106   public long getTotalNumEdges() {
107     return giraphCompute.getTotalNumEdges();
108   }
109 
110   /**
111    * Send a message to a vertex id.
112    *
113    * @param id Vertex id to send the message to
114    * @param message Message data to send
115    */
116   public void sendMessage(Object id, Object message) {
117     WritableComparable wrappedId = giraphCompute.wrapIdIfNecessary(id);
118     Writable wrappedMessage = giraphCompute.wrapIfNecessary(message,
119         GraphType.OUTGOING_MESSAGE_VALUE);
120     giraphCompute.sendMessage(wrappedId, wrappedMessage);
121   }
122 
123   /**
124    * Send a message to all edges.
125    *
126    * @param vertex Vertex whose edges to send the message to.
127    * @param message Message sent to all edges.
128    */
129   public void sendMessageToAllEdges(Vertex vertex, Object message) {
130     Writable wrappedMessage = giraphCompute.wrapIfNecessary(message,
131         GraphType.OUTGOING_MESSAGE_VALUE);
132     giraphCompute.sendMessageToAllEdges(vertex, wrappedMessage);
133   }
134 
135   /**
136    * Send a message to multiple target vertex ids in the iterator.
137    *
138    * @param vertexIdIterator An iterator to multiple target vertex ids.
139    * @param message Message sent to all targets in the iterator.
140    */
141   public void sendMessageToMultipleEdges(Iterator vertexIdIterator,
142       Object message) {
143     Writable wrappedMessage = giraphCompute.wrapIfNecessary(message,
144         GraphType.OUTGOING_MESSAGE_VALUE);
145     giraphCompute.sendMessageToMultipleEdges(vertexIdIterator, wrappedMessage);
146   }
147 
148   /**
149    * Sends a request to create a vertex that will be available during the
150    * next superstep.
151    *
152    * @param id Vertex id
153    * @param vertexValue Vertex value
154    * @param edges Initial edges
155    */
156   public void addVertexRequest(Object id, Object vertexValue,
157       OutEdges edges) throws IOException {
158     WritableComparable wrappedId = giraphCompute.wrapIdIfNecessary(id);
159     Writable wrappedValue = giraphCompute.wrapIfNecessary(vertexValue,
160         GraphType.VERTEX_VALUE);
161     giraphCompute.addVertexRequest(wrappedId, wrappedValue, edges);
162   }
163 
164   /**
165    * Sends a request to create a vertex that will be available during the
166    * next superstep.
167    *
168    * @param id Vertex id
169    * @param vertexValue Vertex value
170    */
171   public void addVertexRequest(Object id, Object vertexValue)
172     throws IOException {
173     WritableComparable wrappedId = giraphCompute.wrapIdIfNecessary(id);
174     Writable wrappedVertexValue = giraphCompute.wrapIfNecessary(vertexValue,
175         GraphType.VERTEX_VALUE);
176     giraphCompute.addVertexRequest(wrappedId, wrappedVertexValue);
177   }
178 
179   /**
180    * Request to remove a vertex from the graph
181    * (applied just prior to the next superstep).
182    *
183    * @param id Id of the vertex to be removed.
184    */
185   public void removeVertexRequest(Object id) throws IOException {
186     WritableComparable wrappedId = giraphCompute.wrapIdIfNecessary(id);
187     giraphCompute.removeVertexRequest(wrappedId);
188   }
189 
190   /**
191    * Request to add an edge of a vertex in the graph
192    * (processed just prior to the next superstep)
193    *
194    * @param sourceVertexId Source vertex id of edge
195    * @param edge Edge to add
196    */
197   public void addEdgeRequest(Object sourceVertexId, Edge edge)
198     throws IOException {
199     WritableComparable wrappedSourceId =
200         giraphCompute.wrapIdIfNecessary(sourceVertexId);
201     giraphCompute.addEdgeRequest(wrappedSourceId, edge);
202   }
203 
204   /**
205    * Request to remove all edges from a given source vertex to a given target
206    * vertex (processed just prior to the next superstep).
207    *
208    * @param sourceVertexId Source vertex id
209    * @param targetVertexId Target vertex id
210    */
211   public void removeEdgesRequest(Object sourceVertexId, Object targetVertexId)
212     throws IOException {
213     WritableComparable wrappedSourceVertexId =
214         giraphCompute.wrapIdIfNecessary(sourceVertexId);
215     WritableComparable wrappedTargetVertexId =
216         giraphCompute.wrapIdIfNecessary(targetVertexId);
217     giraphCompute.removeEdgesRequest(wrappedSourceVertexId,
218         wrappedTargetVertexId);
219   }
220 
221   /**
222    * Get the mapper context
223    *
224    * @return Mapper context
225    */
226   public Mapper.Context getContext() {
227     return giraphCompute.getContext();
228   }
229 
230   /**
231    * Get the worker context
232    *
233    * @param <W> WorkerContext class
234    * @return WorkerContext context
235    */
236   @SuppressWarnings("unchecked")
237   public <W extends WorkerContext> W getWorkerContext() {
238     return (W) giraphCompute.getWorkerContext();
239   }
240 }
241