This project has retired. For details please refer to its Attic page.
BlockWorkerApiWrapper xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.api.giraph;
19  
20  import java.io.IOException;
21  import java.util.Iterator;
22  
23  import org.apache.giraph.block_app.framework.api.BlockOutputApi;
24  import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
25  import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
26  import org.apache.giraph.block_app.framework.api.BlockWorkerValueAccessor;
27  import org.apache.giraph.block_app.framework.api.Counter;
28  import org.apache.giraph.block_app.framework.internal.BlockCounters;
29  import org.apache.giraph.block_app.framework.output.BlockOutputDesc;
30  import org.apache.giraph.block_app.framework.output.BlockOutputWriter;
31  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
32  import org.apache.giraph.edge.Edge;
33  import org.apache.giraph.edge.OutEdges;
34  import org.apache.giraph.graph.Computation;
35  import org.apache.giraph.graph.Vertex;
36  import org.apache.giraph.types.NoMessage;
37  import org.apache.giraph.worker.WorkerGlobalCommUsage;
38  import org.apache.hadoop.io.Writable;
39  import org.apache.hadoop.io.WritableComparable;
40  
41  /**
42   * Giraph implementation of BlockWorkerReceiveApi and BlockWorkerSendAPI,
43   * passing all calls to Computation.
44   *
45   * @param <I> Vertex id type
46   * @param <V> Vertex value type
47   * @param <E> Edge value type
48   * @param <M> Message type
49   */
50  @SuppressWarnings("rawtypes")
51  final class BlockWorkerApiWrapper<I extends WritableComparable,
52      V extends Writable, E extends Writable, M extends Writable>
53      implements BlockWorkerReceiveApi<I>, BlockWorkerSendApi<I, V, E, M>,
54      BlockWorkerValueAccessor, WorkerGlobalCommUsage, BlockOutputApi {
55    private final Computation<I, V, E, NoMessage, M> worker;
56  
57    public BlockWorkerApiWrapper(Computation<I, V, E, NoMessage, M> worker) {
58      this.worker = worker;
59    }
60  
61    @Override
62    public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
63      return worker.getConf();
64    }
65  
66    @Override
67    public <A extends Writable> void aggregate(String name, A value) {
68      worker.aggregate(name, value);
69    }
70  
71    @Override
72    public <A extends Writable> A getAggregatedValue(String name) {
73      return worker.getAggregatedValue(name);
74    }
75  
76    @Override
77    public <B extends Writable> B getBroadcast(String name) {
78      return worker.getBroadcast(name);
79    }
80  
81    @Override
82    public void reduce(String name, Object value) {
83      worker.reduce(name, value);
84    }
85  
86    @Override
87    public void reduceMerge(String name, Writable value) {
88      worker.reduceMerge(name, value);
89    }
90  
91    @Override
92    public void sendMessage(I id, M message) {
93      worker.sendMessage(id, message);
94    }
95  
96    @Override
97    public void sendMessageToAllEdges(Vertex<I, V, E> vertex, M message) {
98      worker.sendMessageToAllEdges(vertex, message);
99    }
100 
101   @Override
102   public void sendMessageToMultipleEdges(
103       Iterator<I> vertexIdIterator, M message) {
104     worker.sendMessageToMultipleEdges(vertexIdIterator, message);
105   }
106 
107   @Override
108   public void addVertexRequest(I id, V value) {
109     try {
110       worker.addVertexRequest(id, value);
111     } catch (IOException e) {
112       throw new RuntimeException(e);
113     }
114   }
115 
116   @Override
117   public void addVertexRequest(I id, V value, OutEdges<I, E> edges) {
118     try {
119       worker.addVertexRequest(id, value, edges);
120     } catch (IOException e) {
121       throw new RuntimeException(e);
122     }
123   }
124 
125   @Override
126   public void removeVertexRequest(I vertexId) {
127     try {
128       worker.removeVertexRequest(vertexId);
129     } catch (IOException e) {
130       throw new RuntimeException(e);
131     }
132   }
133 
134   @Override
135   public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge) {
136     try {
137       worker.addEdgeRequest(sourceVertexId, edge);
138     } catch (IOException e) {
139       throw new RuntimeException(e);
140     }
141   }
142 
143   @Override
144   public void removeEdgesRequest(I sourceVertexId, I targetVertexId) {
145     try {
146       worker.removeEdgesRequest(sourceVertexId, targetVertexId);
147     } catch (IOException e) {
148       throw new RuntimeException(e);
149     }
150   }
151 
152   private BlockWorkerContext getBlockWorkerContext() {
153     return (BlockWorkerContext) worker.getWorkerContext();
154   }
155 
156   @Override
157   public Object getWorkerValue() {
158     return getBlockWorkerContext().getWorkerValue();
159   }
160 
161   @Override
162   public long getTotalNumEdges() {
163     return worker.getTotalNumEdges();
164   }
165 
166   @Override
167   public long getTotalNumVertices() {
168     return worker.getTotalNumVertices();
169   }
170 
171   @Override
172   public <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>>
173   OD getOutputDesc(String confOption) {
174     return getBlockWorkerContext().getOutputHandle().<OW, OD>getOutputDesc(
175         confOption);
176   }
177 
178   @Override
179   public <OW extends BlockOutputWriter> OW getWriter(String confOption) {
180     return getBlockWorkerContext().getOutputHandle().getWriter(confOption);
181   }
182 
183   @Override
184   public int getMyWorkerIndex() {
185     return worker.getMyWorkerIndex();
186   }
187 
188   @Override
189   public int getWorkerCount() {
190     return worker.getWorkerCount();
191   }
192 
193   @Override
194   public int getWorkerForVertex(I vertexId) {
195     return worker.getWorkerForVertex(vertexId);
196   }
197 
198   @Override
199   public Counter getCounter(String group, String name) {
200     return BlockCounters.getCounter(worker.getContext(), group, name);
201   }
202 
203   @Override
204   public void progress() {
205     worker.getContext().progress();
206   }
207 
208   @Override
209   public void setStatus(String status) {
210     worker.getContext().setStatus(status);
211   }
212 }