This project has retired. For details please refer to its Attic page.
MigrationAbstractComputation xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.migration;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.IOException;
23  import java.util.Iterator;
24  
25  import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
26  import org.apache.giraph.conf.TypesHolder;
27  import org.apache.giraph.edge.Edge;
28  import org.apache.giraph.edge.OutEdges;
29  import org.apache.giraph.graph.Vertex;
30  import org.apache.giraph.worker.WorkerAggregatorDelegator;
31  import org.apache.giraph.worker.WorkerGlobalCommUsage;
32  import org.apache.hadoop.io.Writable;
33  import org.apache.hadoop.io.WritableComparable;
34  
35  /**
36   * Replacement for AbstractComputation when migrating to
37   * Blocks Framework, disallowing functions that are tied to
38   * execution order.
39   *
40   * @param <I> Vertex ID
41   * @param <V> Vertex Value
42   * @param <E> Edge Value
43   * @param <M1> Incoming Message Value
44   * @param <M2> Outgoing Message Value
45   */
46  @SuppressWarnings("rawtypes")
47  public class MigrationAbstractComputation<I extends WritableComparable,
48      V extends Writable, E extends Writable, M1 extends Writable,
49      M2 extends Writable> extends WorkerAggregatorDelegator<I, V, E>
50      implements TypesHolder<I, V, E, M1, M2>, Writable {
51    private BlockWorkerSendApi<I, V, E, M2> api;
52    private MigrationWorkerContext workerContext;
53    private long superstep;
54  
55    final void init(
56        BlockWorkerSendApi<I, V, E, M2> workerApi,
57        MigrationWorkerContext workerContext,
58        long superstep) {
59      this.api = workerApi;
60      this.workerContext = workerContext;
61      this.superstep = superstep;
62      setWorkerGlobalCommUsage((WorkerGlobalCommUsage) workerApi);
63      setConf(workerApi.getConf());
64    }
65  
66    public void compute(Vertex<I, V, E> vertex,
67        Iterable<M1> messages) throws IOException {
68    }
69  
70    @Override
71    public void readFields(DataInput in) throws IOException {
72    }
73  
74    @Override
75    public void write(DataOutput out) throws IOException {
76    }
77  
78    public void preSuperstep() {
79    }
80  
81    public void postSuperstep() {
82    }
83  
84    @SuppressWarnings("deprecation")
85    public long getTotalNumVertices() {
86      return api.getTotalNumVertices();
87    }
88  
89    @SuppressWarnings("deprecation")
90    public long getTotalNumEdges() {
91      return api.getTotalNumEdges();
92    }
93  
94    public void sendMessage(I id, M2 message) {
95      api.sendMessage(id, message);
96    }
97  
98    public final void sendMessageToAllEdges(Vertex<I, V, E> vertex, M2 message) {
99      api.sendMessageToAllEdges(vertex, message);
100   }
101 
102   public final void sendMessageToMultipleEdges(
103       Iterator<I> vertexIdIterator, M2 message) {
104     api.sendMessageToMultipleEdges(vertexIdIterator, message);
105   }
106 
107   public final void addVertexRequest(I id, V value,
108       OutEdges<I, E> edges) throws IOException {
109     api.addVertexRequest(id, value, edges);
110   }
111 
112   public final void addVertexRequest(I id, V value) throws IOException {
113     api.addVertexRequest(id, value);
114   }
115 
116   public final void removeVertexRequest(I vertexId) throws IOException {
117     api.removeVertexRequest(vertexId);
118   }
119 
120   public final void addEdgeRequest(I sourceVertexId,
121       Edge<I, E> edge) throws IOException {
122     api.addEdgeRequest(sourceVertexId, edge);
123   }
124 
125   public final void removeEdgesRequest(I sourceVertexId,
126       I targetVertexId) throws IOException {
127     api.removeEdgesRequest(sourceVertexId, targetVertexId);
128   }
129 
130   @SuppressWarnings("unchecked")
131   public <W extends MigrationWorkerContext> W getWorkerContext() {
132     return (W) workerContext;
133   }
134 
135   /**
136    * Drop-in replacement for BasicComputation when migrating to
137    * Blocks Framework, disallowing functions that are tied to
138    * execution order.
139    *
140    * @param <I> Vertex ID
141    * @param <V> Vertex Value
142    * @param <E> Edge Value
143    * @param <M> Message type
144    */
145   public static class MigrationBasicComputation<I extends WritableComparable,
146       V extends Writable, E extends Writable, M extends Writable>
147       extends MigrationAbstractComputation<I, V, E, M, M> {
148   }
149 
150   /**
151    * Drop-in replacement for AbstractComputation when migrating to
152    * Blocks Framework.
153    *
154    * @param <I> Vertex ID
155    * @param <V> Vertex Value
156    * @param <E> Edge Value
157    * @param <M1> Incoming Message Value
158    * @param <M2> Outgoing Message Value
159    */
160   public static class MigrationFullAbstractComputation
161       <I extends WritableComparable, V extends Writable, E extends Writable,
162       M1 extends Writable, M2 extends Writable>
163       extends MigrationAbstractComputation<I, V, E, M1, M2> {
164     public long getSuperstep() {
165       return super.superstep;
166     }
167   }
168 
169   /**
170    * Drop-in replacement for BasicComputation when migrating to
171    * Blocks Framework.
172    *
173    * @param <I> Vertex ID
174    * @param <V> Vertex Value
175    * @param <E> Edge Value
176    * @param <M> Message type
177    */
178   public static class MigrationFullBasicComputation
179       <I extends WritableComparable, V extends Writable, E extends Writable,
180       M extends Writable>
181       extends MigrationFullAbstractComputation<I, V, E, M, M> {
182   }
183 }