This project has retired. For details please refer to its Attic page.
MigrationWorkerContext xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.migration;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.IOException;
23  import java.util.List;
24  
25  import org.apache.giraph.block_app.framework.api.BlockWorkerContextApi;
26  import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
27  import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
28  import org.apache.hadoop.io.Writable;
29  import org.apache.hadoop.io.WritableComparable;
30  
31  /**
32   * Replacement for WorkerContext when migrating to Blocks Framework,
33   * disallowing functions that are tied to execution order.
34   */
35  @SuppressWarnings({"rawtypes", "unchecked"})
36  public class MigrationWorkerContext
37      extends DefaultImmutableClassesGiraphConfigurable
38      implements Writable {
39    private BlockWorkerContextApi api;
40    private List<Writable> receivedMessages;
41  
42    public void setApi(BlockWorkerContextApi api) {
43      this.api = api;
44      this.setConf(api.getConf());
45    }
46  
47    public void setReceivedMessages(List<Writable> receivedMessages) {
48      this.receivedMessages = receivedMessages;
49    }
50  
51    public void preSuperstep() { }
52  
53    public void postSuperstep() { }
54  
55    @SuppressWarnings("deprecation")
56    public long getTotalNumVertices() {
57      return api.getTotalNumVertices();
58    }
59  
60    @SuppressWarnings("deprecation")
61    public long getTotalNumEdges() {
62      return api.getTotalNumEdges();
63    }
64  
65    @Override
66    public void readFields(DataInput in) throws IOException {
67    }
68  
69    @Override
70    public void write(DataOutput out) throws IOException {
71    }
72  
73    public final int getWorkerCount() {
74      return api.getWorkerCount();
75    }
76  
77    public final int getMyWorkerIndex() {
78      return api.getMyWorkerIndex();
79    }
80  
81    public final List<Writable> getAndClearMessagesFromOtherWorkers() {
82      List<Writable> ret = receivedMessages;
83      receivedMessages = null;
84      return ret;
85    }
86  
87    public final void sendMessageToWorker(Writable message, int workerIndex) {
88      ((BlockWorkerContextSendApi<WritableComparable, Writable>) api)
89        .sendMessageToWorker(message, workerIndex);
90    }
91  
92    /**
93     * Drop-in replacement for WorkerContext when migrating to
94     * Blocks Framework.
95     */
96    public static class MigrationFullWorkerContext
97        extends MigrationWorkerContext {
98      public void preApplication()
99          throws InstantiationException, IllegalAccessException {
100     }
101 
102     public void postApplication() { }
103   }
104 }