This project has retired. For details please refer to its
Attic page.
BlockWorkerApiWrapper xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.framework.api.giraph;
19
20 import java.io.IOException;
21 import java.util.Iterator;
22
23 import org.apache.giraph.block_app.framework.api.BlockOutputApi;
24 import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
25 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
26 import org.apache.giraph.block_app.framework.api.BlockWorkerValueAccessor;
27 import org.apache.giraph.block_app.framework.api.Counter;
28 import org.apache.giraph.block_app.framework.internal.BlockCounters;
29 import org.apache.giraph.block_app.framework.output.BlockOutputDesc;
30 import org.apache.giraph.block_app.framework.output.BlockOutputWriter;
31 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
32 import org.apache.giraph.edge.Edge;
33 import org.apache.giraph.edge.OutEdges;
34 import org.apache.giraph.graph.Computation;
35 import org.apache.giraph.graph.Vertex;
36 import org.apache.giraph.types.NoMessage;
37 import org.apache.giraph.worker.WorkerGlobalCommUsage;
38 import org.apache.hadoop.io.Writable;
39 import org.apache.hadoop.io.WritableComparable;
40
41
42
43
44
45
46
47
48
49
50 @SuppressWarnings("rawtypes")
51 final class BlockWorkerApiWrapper<I extends WritableComparable,
52 V extends Writable, E extends Writable, M extends Writable>
53 implements BlockWorkerReceiveApi<I>, BlockWorkerSendApi<I, V, E, M>,
54 BlockWorkerValueAccessor, WorkerGlobalCommUsage, BlockOutputApi {
55 private final Computation<I, V, E, NoMessage, M> worker;
56
57 public BlockWorkerApiWrapper(Computation<I, V, E, NoMessage, M> worker) {
58 this.worker = worker;
59 }
60
61 @Override
62 public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
63 return worker.getConf();
64 }
65
66 @Override
67 public <A extends Writable> void aggregate(String name, A value) {
68 worker.aggregate(name, value);
69 }
70
71 @Override
72 public <A extends Writable> A getAggregatedValue(String name) {
73 return worker.getAggregatedValue(name);
74 }
75
76 @Override
77 public <B extends Writable> B getBroadcast(String name) {
78 return worker.getBroadcast(name);
79 }
80
81 @Override
82 public void reduce(String name, Object value) {
83 worker.reduce(name, value);
84 }
85
86 @Override
87 public void reduceMerge(String name, Writable value) {
88 worker.reduceMerge(name, value);
89 }
90
91 @Override
92 public void sendMessage(I id, M message) {
93 worker.sendMessage(id, message);
94 }
95
96 @Override
97 public void sendMessageToAllEdges(Vertex<I, V, E> vertex, M message) {
98 worker.sendMessageToAllEdges(vertex, message);
99 }
100
101 @Override
102 public void sendMessageToMultipleEdges(
103 Iterator<I> vertexIdIterator, M message) {
104 worker.sendMessageToMultipleEdges(vertexIdIterator, message);
105 }
106
107 @Override
108 public void addVertexRequest(I id, V value) {
109 try {
110 worker.addVertexRequest(id, value);
111 } catch (IOException e) {
112 throw new RuntimeException(e);
113 }
114 }
115
116 @Override
117 public void addVertexRequest(I id, V value, OutEdges<I, E> edges) {
118 try {
119 worker.addVertexRequest(id, value, edges);
120 } catch (IOException e) {
121 throw new RuntimeException(e);
122 }
123 }
124
125 @Override
126 public void removeVertexRequest(I vertexId) {
127 try {
128 worker.removeVertexRequest(vertexId);
129 } catch (IOException e) {
130 throw new RuntimeException(e);
131 }
132 }
133
134 @Override
135 public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge) {
136 try {
137 worker.addEdgeRequest(sourceVertexId, edge);
138 } catch (IOException e) {
139 throw new RuntimeException(e);
140 }
141 }
142
143 @Override
144 public void removeEdgesRequest(I sourceVertexId, I targetVertexId) {
145 try {
146 worker.removeEdgesRequest(sourceVertexId, targetVertexId);
147 } catch (IOException e) {
148 throw new RuntimeException(e);
149 }
150 }
151
152 private BlockWorkerContext getBlockWorkerContext() {
153 return (BlockWorkerContext) worker.getWorkerContext();
154 }
155
156 @Override
157 public Object getWorkerValue() {
158 return getBlockWorkerContext().getWorkerValue();
159 }
160
161 @Override
162 public long getTotalNumEdges() {
163 return worker.getTotalNumEdges();
164 }
165
166 @Override
167 public long getTotalNumVertices() {
168 return worker.getTotalNumVertices();
169 }
170
171 @Override
172 public <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>>
173 OD getOutputDesc(String confOption) {
174 return getBlockWorkerContext().getOutputHandle().<OW, OD>getOutputDesc(
175 confOption);
176 }
177
178 @Override
179 public <OW extends BlockOutputWriter> OW getWriter(String confOption) {
180 return getBlockWorkerContext().getOutputHandle().getWriter(confOption);
181 }
182
183 @Override
184 public int getMyWorkerIndex() {
185 return worker.getMyWorkerIndex();
186 }
187
188 @Override
189 public int getWorkerCount() {
190 return worker.getWorkerCount();
191 }
192
193 @Override
194 public int getWorkerForVertex(I vertexId) {
195 return worker.getWorkerForVertex(vertexId);
196 }
197
198 @Override
199 public Counter getCounter(String group, String name) {
200 return BlockCounters.getCounter(worker.getContext(), group, name);
201 }
202
203 @Override
204 public void progress() {
205 worker.getContext().progress();
206 }
207
208 @Override
209 public void setStatus(String status) {
210 worker.getContext().setStatus(status);
211 }
212 }