This project has retired. For details please refer to its Attic page.
WorkerContext xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.worker;
20  
21  import java.io.DataInput;
22  import java.io.DataOutput;
23  import java.io.IOException;
24  import java.util.List;
25  
26  import org.apache.giraph.bsp.CentralizedServiceWorker;
27  import org.apache.giraph.comm.requests.SendWorkerToWorkerMessageRequest;
28  import org.apache.giraph.graph.GraphState;
29  import org.apache.hadoop.io.Writable;
30  import org.apache.hadoop.io.WritableComparable;
31  import org.apache.hadoop.mapreduce.Mapper;
32  
33  /**
34   * WorkerContext allows for the execution of user code
35   * on a per-worker basis. There's one WorkerContext per worker.
36   */
37  @SuppressWarnings("rawtypes")
38  public abstract class WorkerContext
39    extends WorkerAggregatorDelegator<WritableComparable, Writable, Writable>
40    implements Writable, WorkerIndexUsage<WritableComparable> {
41    /** Global graph state */
42    private GraphState graphState;
43  
44    /** Service worker */
45    private CentralizedServiceWorker serviceWorker;
46    /** All workers info */
47    private AllWorkersInfo allWorkersInfo;
48  
49    /**
50     * Set the graph state.
51     *
52     * @param graphState Used to set the graph state.
53     */
54    public final void setGraphState(GraphState graphState) {
55      this.graphState = graphState;
56    }
57  
58    /**
59     * Setup superstep.
60     *
61     * @param serviceWorker Service worker containing all the information
62     */
63    public final void setupSuperstep(
64        CentralizedServiceWorker<?, ?, ?> serviceWorker) {
65      this.serviceWorker = serviceWorker;
66      allWorkersInfo = new AllWorkersInfo(
67          serviceWorker.getWorkerInfoList(), serviceWorker.getWorkerInfo());
68    }
69  
70    /**
71     * Initialize the WorkerContext.
72     * This method is executed once on each Worker before the first
73     * superstep starts.
74     *
75     * @throws IllegalAccessException Thrown for getting the class
76     * @throws InstantiationException Expected instantiation in this method.
77     */
78    public abstract void preApplication() throws InstantiationException,
79      IllegalAccessException;
80  
81    /**
82     * Finalize the WorkerContext.
83     * This method is executed once on each Worker after the last
84     * superstep ends.
85     */
86    public abstract void postApplication();
87  
88    /**
89     * Execute user code.
90     * This method is executed once on each Worker before each
91     * superstep starts.
92     */
93    public abstract void preSuperstep();
94  
95    /**
96     * Get number of workers
97     *
98     * @return Number of workers
99     */
100   @Override
101   public final int getWorkerCount() {
102     return allWorkersInfo.getWorkerCount();
103   }
104 
105   /**
106    * Get index for this worker
107    *
108    * @return Index of this worker
109    */
110   @Override
111   public final int getMyWorkerIndex() {
112     return allWorkersInfo.getMyWorkerIndex();
113   }
114 
115   @Override
116   public final int getWorkerForVertex(WritableComparable vertexId) {
117     return allWorkersInfo.getWorkerIndex(
118         serviceWorker.getVertexPartitionOwner(vertexId).getWorkerInfo());
119   }
120 
121   /**
122    * Get messages which other workers sent to this worker and clear them (can
123    * be called once per superstep)
124    *
125    * @return Messages received
126    */
127   public final List<Writable> getAndClearMessagesFromOtherWorkers() {
128     return serviceWorker.getServerData().
129         getAndClearCurrentWorkerToWorkerMessages();
130   }
131 
132   /**
133    * Send message to another worker
134    *
135    * @param message Message to send
136    * @param workerIndex Index of the worker to send the message to
137    */
138   public final void sendMessageToWorker(Writable message, int workerIndex) {
139     SendWorkerToWorkerMessageRequest request =
140         new SendWorkerToWorkerMessageRequest(message);
141     if (workerIndex == getMyWorkerIndex()) {
142       request.doRequest(serviceWorker.getServerData());
143     } else {
144       serviceWorker.getWorkerClient().sendWritableRequest(
145           allWorkersInfo.getWorkerList().get(workerIndex).getTaskId(), request);
146     }
147   }
148 
149   /**
150    * Execute user code.
151    * This method is executed once on each Worker after each
152    * superstep ends.
153    */
154   public abstract void postSuperstep();
155 
156   /**
157    * Retrieves the current superstep.
158    *
159    * @return Current superstep
160    */
161   public final long getSuperstep() {
162     return graphState.getSuperstep();
163   }
164 
165   /**
166    * Get the total (all workers) number of vertices that
167    * existed in the previous superstep.
168    *
169    * @return Total number of vertices (-1 if first superstep)
170    */
171   public final long getTotalNumVertices() {
172     return graphState.getTotalNumVertices();
173   }
174 
175   /**
176    * Get the total (all workers) number of edges that
177    * existed in the previous superstep.
178    *
179    * @return Total number of edges (-1 if first superstep)
180    */
181   public final long getTotalNumEdges() {
182     return graphState.getTotalNumEdges();
183   }
184 
185   /**
186    * Get the mapper context
187    *
188    * @return Mapper context
189    */
190   public final Mapper.Context getContext() {
191     return graphState.getContext();
192   }
193 
194   /**
195    * Call this to log a line to command line of the job. Use in moderation -
196    * it's a synchronous call to Job client
197    *
198    * @param line Line to print
199    */
200   public final void logToCommandLine(String line) {
201     serviceWorker.getJobProgressTracker().logInfo(line);
202   }
203 
204   @Override
205   public void write(DataOutput dataOutput) throws IOException {
206   }
207 
208   @Override
209   public void readFields(DataInput dataInput) throws IOException {
210   }
211 }