View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.api.giraph;
19  
20  import java.io.IOException;
21  import java.util.Iterator;
22  
23  import org.apache.giraph.block_app.framework.api.BlockOutputApi;
24  import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
25  import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
26  import org.apache.giraph.block_app.framework.api.BlockWorkerValueAccessor;
27  import org.apache.giraph.block_app.framework.output.BlockOutputDesc;
28  import org.apache.giraph.block_app.framework.output.BlockOutputWriter;
29  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
30  import org.apache.giraph.edge.Edge;
31  import org.apache.giraph.edge.OutEdges;
32  import org.apache.giraph.graph.Computation;
33  import org.apache.giraph.graph.Vertex;
34  import org.apache.giraph.types.NoMessage;
35  import org.apache.giraph.worker.WorkerGlobalCommUsage;
36  import org.apache.hadoop.io.Writable;
37  import org.apache.hadoop.io.WritableComparable;
38  
39  /**
40   * Giraph implementation of BlockWorkerReceiveApi and BlockWorkerSendAPI,
41   * passing all calls to Computation.
42   *
43   * @param <I> Vertex id type
44   * @param <V> Vertex value type
45   * @param <E> Edge value type
46   * @param <M> Message type
47   */
48  @SuppressWarnings("rawtypes")
49  final class BlockWorkerApiWrapper<I extends WritableComparable,
50      V extends Writable, E extends Writable, M extends Writable>
51      implements BlockWorkerReceiveApi<I>, BlockWorkerSendApi<I, V, E, M>,
52      BlockWorkerValueAccessor, WorkerGlobalCommUsage, BlockOutputApi {
53    private final Computation<I, V, E, NoMessage, M> worker;
54  
55    public BlockWorkerApiWrapper(Computation<I, V, E, NoMessage, M> worker) {
56      this.worker = worker;
57    }
58  
59    @Override
60    public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
61      return worker.getConf();
62    }
63  
64    @Override
65    public <A extends Writable> void aggregate(String name, A value) {
66      worker.aggregate(name, value);
67    }
68  
69    @Override
70    public <A extends Writable> A getAggregatedValue(String name) {
71      return worker.getAggregatedValue(name);
72    }
73  
74    @Override
75    public <B extends Writable> B getBroadcast(String name) {
76      return worker.getBroadcast(name);
77    }
78  
79    @Override
80    public void reduce(String name, Object value) {
81      worker.reduce(name, value);
82    }
83  
84    @Override
85    public void reduceMerge(String name, Writable value) {
86      worker.reduceMerge(name, value);
87    }
88  
89    @Override
90    public void sendMessage(I id, M message) {
91      worker.sendMessage(id, message);
92    }
93  
94    @Override
95    public void sendMessageToAllEdges(Vertex<I, V, E> vertex, M message) {
96      worker.sendMessageToAllEdges(vertex, message);
97    }
98  
99    @Override
100   public void sendMessageToMultipleEdges(
101       Iterator<I> vertexIdIterator, M message) {
102     worker.sendMessageToMultipleEdges(vertexIdIterator, message);
103   }
104 
105   @Override
106   public void addVertexRequest(I id, V value) {
107     try {
108       worker.addVertexRequest(id, value);
109     } catch (IOException e) {
110       throw new RuntimeException(e);
111     }
112   }
113 
114   @Override
115   public void addVertexRequest(I id, V value, OutEdges<I, E> edges) {
116     try {
117       worker.addVertexRequest(id, value, edges);
118     } catch (IOException e) {
119       throw new RuntimeException(e);
120     }
121   }
122 
123   @Override
124   public void removeVertexRequest(I vertexId) {
125     try {
126       worker.removeVertexRequest(vertexId);
127     } catch (IOException e) {
128       throw new RuntimeException(e);
129     }
130   }
131 
132   @Override
133   public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge) {
134     try {
135       worker.addEdgeRequest(sourceVertexId, edge);
136     } catch (IOException e) {
137       throw new RuntimeException(e);
138     }
139   }
140 
141   @Override
142   public void removeEdgesRequest(I sourceVertexId, I targetVertexId) {
143     try {
144       worker.removeEdgesRequest(sourceVertexId, targetVertexId);
145     } catch (IOException e) {
146       throw new RuntimeException(e);
147     }
148   }
149 
150   private BlockWorkerContext getBlockWorkerContext() {
151     return (BlockWorkerContext) worker.getWorkerContext();
152   }
153 
154   @Override
155   public Object getWorkerValue() {
156     return getBlockWorkerContext().getWorkerValue();
157   }
158 
159   @Override
160   public long getTotalNumEdges() {
161     return worker.getTotalNumEdges();
162   }
163 
164   @Override
165   public long getTotalNumVertices() {
166     return worker.getTotalNumVertices();
167   }
168 
169   @Override
170   public <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>>
171   OD getOutputDesc(String confOption) {
172     return getBlockWorkerContext().getOutputHandle().<OW, OD>getOutputDesc(
173         confOption);
174   }
175 
176   @Override
177   public <OW extends BlockOutputWriter> OW getWriter(String confOption) {
178     return getBlockWorkerContext().getOutputHandle().getWriter(confOption);
179   }
180 
181   @Override
182   public int getMyWorkerIndex() {
183     return worker.getMyWorkerIndex();
184   }
185 
186   @Override
187   public int getWorkerCount() {
188     return worker.getWorkerCount();
189   }
190 
191   @Override
192   public int getWorkerForVertex(I vertexId) {
193     return worker.getWorkerForVertex(vertexId);
194   }
195 }