This project has retired. For details please refer to its
Attic page.
MigrationPiece xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.migration;
19
20 import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
21
22 import java.io.IOException;
23 import java.util.Collections;
24 import java.util.List;
25
26 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
27 import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
28 import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
29 import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
30 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
31 import org.apache.giraph.block_app.framework.piece.PieceWithWorkerContext;
32 import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
33 import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
34 import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullAbstractComputation;
35 import org.apache.giraph.block_app.migration.MigrationMasterCompute.MigrationFullMasterCompute;
36 import org.apache.giraph.block_app.migration.MigrationWorkerContext.MigrationFullWorkerContext;
37 import org.apache.giraph.combiner.MessageCombiner;
38 import org.apache.giraph.conf.DefaultMessageClasses;
39 import org.apache.giraph.conf.GiraphConfiguration;
40 import org.apache.giraph.conf.GiraphConstants;
41 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
42 import org.apache.giraph.conf.MessageClasses;
43 import org.apache.giraph.conf.TypesHolder;
44 import org.apache.giraph.factories.DefaultMessageValueFactory;
45 import org.apache.giraph.function.Consumer;
46 import org.apache.giraph.function.ObjectTransfer;
47 import org.apache.giraph.function.Supplier;
48 import org.apache.giraph.graph.Vertex;
49 import org.apache.giraph.utils.ReflectionUtils;
50 import org.apache.hadoop.io.Writable;
51 import org.apache.hadoop.io.WritableComparable;
52
53 import com.google.common.base.Preconditions;
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99 @SuppressWarnings("rawtypes")
100 public final class MigrationPiece<I extends WritableComparable,
101 V extends Writable, E extends Writable, MPrev extends Writable,
102 M extends Writable> extends PieceWithWorkerContext<I, V, E, M,
103 MigrationWorkerContext, Writable, MigrationSuperstepStage> {
104
105 private final Class<? extends MigrationAbstractComputation<I, V, E, MPrev, M>>
106 computationClass;
107
108 private final transient MigrationMasterCompute masterCompute;
109 private final Supplier<Iterable<MPrev>> previousMessagesSupplier;
110 private final Consumer<Iterable<M>> currentMessagesConsumer;
111 private final transient Class<M> messageClass;
112 private final transient Class<? extends MessageCombiner<? super I, M>>
113 messageCombinerClass;
114
115 private final boolean isFullMigration;
116 private final boolean isFirstStep;
117
118 private transient MigrationPiece nextPiece;
119 private boolean isHalted;
120
121 private MigrationPiece(
122 Class<? extends MigrationAbstractComputation<I, V, E, MPrev, M>>
123 computationClass,
124 MigrationMasterCompute masterCompute, Supplier<Iterable<MPrev>>
125 previousMessagesSupplier,
126 Consumer<Iterable<M>> currentMessagesConsumer, Class<M> messageClass,
127 Class<? extends MessageCombiner<? super I, M>> messageCombinerClass,
128 boolean isFullMigration, boolean isFirstStep) {
129 this.computationClass = computationClass;
130 this.masterCompute = masterCompute;
131 this.previousMessagesSupplier = previousMessagesSupplier;
132 this.currentMessagesConsumer = currentMessagesConsumer;
133 this.messageClass = messageClass;
134 this.messageCombinerClass = messageCombinerClass;
135 this.isFullMigration = isFullMigration;
136 this.isFirstStep = isFirstStep;
137 isHalted = false;
138 nextPiece = null;
139 sanityChecks();
140 }
141
142
143 @SuppressWarnings("unchecked")
144 static <I extends WritableComparable, V extends Writable, E extends Writable,
145 MR extends Writable, MS extends Writable>
146 MigrationPiece<I, V, E, MR, MS> createFirstFullMigrationPiece(
147 Class<? extends MigrationAbstractComputation<I, V, E, MR, MS>>
148 computationClass,
149 MigrationFullMasterCompute masterCompute,
150 Class<MS> messageClass,
151 Class<? extends MessageCombiner<? super I, MS>> messageCombinerClass) {
152 ObjectTransfer transfer = new ObjectTransfer();
153 return new MigrationPiece<>(
154 computationClass, masterCompute, transfer, transfer, messageClass,
155 messageCombinerClass,
156 true, true);
157 }
158
159 public static <I extends WritableComparable, V extends Writable,
160 E extends Writable, MR extends Writable, MS extends Writable>
161 MigrationPiece<I, V, E, MR, MS> createMigrationPiece(
162 Class<? extends MigrationAbstractComputation<I, V, E, MR, MS>>
163 computationClass,
164 MigrationMasterCompute masterCompute,
165 Supplier<Iterable<MR>> previousMessagesSupplier,
166 Consumer<Iterable<MS>> currentMessagesConsumer,
167 Class<MS> messageClass,
168 Class<? extends MessageCombiner<? super I, MS>> messageCombinerClass) {
169 return new MigrationPiece<>(
170 computationClass, masterCompute, previousMessagesSupplier,
171 currentMessagesConsumer, messageClass, messageCombinerClass,
172 false, false);
173 }
174
175
176 private void sanityChecks() {
177 Preconditions.checkState(isFullMigration ==
178 MigrationFullAbstractComputation.class
179 .isAssignableFrom(computationClass));
180 }
181
182 void sanityTypeChecks(
183 GiraphConfiguration conf, Class<?> previousMessageClass) {
184 if (computationClass != null) {
185 final Class<?> vertexIdClass = GiraphConstants.VERTEX_ID_CLASS.get(conf);
186 final Class<?> vertexValueClass =
187 GiraphConstants.VERTEX_VALUE_CLASS.get(conf);
188 final Class<?> edgeValueClass =
189 GiraphConstants.EDGE_VALUE_CLASS.get(conf);
190
191 Class<?>[] classList = getTypeArguments(
192 TypesHolder.class, computationClass);
193 Preconditions.checkArgument(classList.length == 5);
194
195 ReflectionUtils.verifyTypes(
196 vertexIdClass, classList[0], "vertexId", computationClass);
197 ReflectionUtils.verifyTypes(
198 vertexValueClass, classList[1], "vertexValue", computationClass);
199 ReflectionUtils.verifyTypes(
200 edgeValueClass, classList[2], "edgeValue", computationClass);
201 if (previousMessageClass != null) {
202 ReflectionUtils.verifyTypes(
203 previousMessageClass, classList[3], "recvMessage",
204 computationClass);
205 }
206 ReflectionUtils.verifyTypes(
207 messageClass, classList[4], "sendMessage", computationClass);
208 }
209 }
210
211 @Override
212 public void registerAggregators(BlockMasterApi masterApi)
213 throws InstantiationException, IllegalAccessException {
214 if (masterCompute != null) {
215 masterCompute.init(masterApi);
216 masterCompute.initialize();
217 }
218 }
219
220 @Override
221 public VertexSender<I, V, E> getVertexSender(
222 BlockWorkerSendApi<I, V, E, M> workerApi,
223 MigrationSuperstepStage executionStage) {
224 if (computationClass == null || isFirstStep) {
225 return null;
226 }
227
228 final MigrationAbstractComputation<I, V, E, MPrev, M> computation =
229 ReflectionUtils.newInstance(computationClass);
230 computation.init(
231 workerApi, getWorkerValue(workerApi),
232 executionStage.getMigrationSuperstep() - 1);
233 computation.preSuperstep();
234
235 return new InnerVertexSender() {
236 @Override
237 public void vertexSend(Vertex<I, V, E> vertex) {
238 try {
239 Iterable<MPrev> messages = null;
240 if (previousMessagesSupplier != null) {
241 messages = previousMessagesSupplier.get();
242 }
243 if (messages == null) {
244 messages = Collections.<MPrev>emptyList();
245 }
246 computation.compute(vertex, messages);
247 } catch (IOException e) {
248 throw new RuntimeException(e);
249 }
250 }
251
252 @Override
253 public void postprocess() {
254 computation.postSuperstep();
255 }
256 };
257 }
258
259 @Override
260 public void workerContextSend(
261 BlockWorkerContextSendApi<I, Writable> workerContextApi,
262 MigrationSuperstepStage executionStage,
263 MigrationWorkerContext workerValue) {
264 if (workerValue != null && !isFirstStep) {
265 workerValue.setApi(workerContextApi);
266 workerValue.postSuperstep();
267 }
268 }
269
270 @SuppressWarnings("unchecked")
271 @Override
272 public void masterCompute(BlockMasterApi masterApi,
273 MigrationSuperstepStage executionStage) {
274 MigrationFullMasterCompute masterComputeF =
275 isFullMigration ? (MigrationFullMasterCompute) masterCompute : null;
276
277 if (masterCompute != null) {
278 masterCompute.init(masterApi);
279
280 if (masterComputeF != null) {
281 masterComputeF.init(
282 executionStage.getMigrationSuperstep(),
283 computationClass, messageClass, messageCombinerClass);
284 }
285
286 masterCompute.compute();
287 }
288
289 if (isFullMigration) {
290 if (masterComputeF != null) {
291 isHalted = masterComputeF.isHalted();
292 if (masterComputeF.isHalted()) {
293 nextPiece = null;
294 } else {
295 if (masterComputeF.getNewComputationClass() != null ||
296 masterComputeF.getNewMessage() != null ||
297 masterComputeF.getNewMessageCombiner() != null) {
298 nextPiece = new MigrationPiece(
299 masterComputeF.getComputationClass(),
300 masterComputeF,
301 previousMessagesSupplier,
302 currentMessagesConsumer,
303 masterComputeF.getOutgoingMessage(),
304 masterComputeF.getMessageCombiner(),
305 true, false);
306 } else {
307 nextPiece = this;
308 }
309 }
310 } else {
311 nextPiece = this;
312 }
313 if (nextPiece != null) {
314 if (nextPiece.isFirstStep) {
315 nextPiece = new MigrationPiece<>(
316 computationClass,
317 masterComputeF,
318 previousMessagesSupplier,
319 currentMessagesConsumer,
320 messageClass,
321 messageCombinerClass,
322 true, false);
323 }
324 nextPiece.sanityTypeChecks(masterApi.getConf(), messageClass);
325 }
326 } else {
327 Preconditions.checkState(!isHalted);
328 Preconditions.checkState(nextPiece == null);
329 }
330 }
331
332 @Override
333 public void workerContextReceive(
334 BlockWorkerContextReceiveApi workerContextApi,
335 MigrationSuperstepStage executionStage,
336 MigrationWorkerContext workerValue, List<Writable> workerMessages) {
337 if (workerValue != null) {
338 workerValue.setApi(workerContextApi);
339 workerValue.setReceivedMessages(workerMessages);
340
341 if (isFirstStep && workerValue instanceof MigrationFullWorkerContext) {
342 try {
343 ((MigrationFullWorkerContext) workerValue).preApplication();
344 } catch (InstantiationException | IllegalAccessException e) {
345 throw new RuntimeException(e);
346 }
347 }
348
349 if (!isHalted) {
350 workerValue.preSuperstep();
351 }
352
353 if (isHalted && workerValue instanceof MigrationFullWorkerContext) {
354 ((MigrationFullWorkerContext) workerValue).postApplication();
355 }
356 }
357 }
358
359 @Override
360 public VertexReceiver<I, V, E, M> getVertexReceiver(
361 BlockWorkerReceiveApi<I> workerApi,
362 MigrationSuperstepStage executionStage) {
363 if (currentMessagesConsumer == null || isHalted) {
364 return null;
365 }
366
367 return new InnerVertexReceiver() {
368 @Override
369 public void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages) {
370 currentMessagesConsumer.apply(messages);
371 }
372 };
373 }
374
375 @Override
376 public MessageClasses<I, M> getMessageClasses(
377 ImmutableClassesGiraphConfiguration conf) {
378 return new DefaultMessageClasses(
379 messageClass,
380 DefaultMessageValueFactory.class,
381 messageCombinerClass,
382 GiraphConstants.MESSAGE_ENCODE_AND_STORE_TYPE.get(conf));
383 }
384
385 @Override
386 public MigrationSuperstepStage nextExecutionStage(
387 MigrationSuperstepStage executionStage) {
388 return executionStage.changedMigrationSuperstep(
389 executionStage.getMigrationSuperstep() + 1);
390 }
391
392 public MigrationPiece getNextPiece() {
393 Preconditions.checkState(isFullMigration);
394 MigrationPiece res = nextPiece;
395 nextPiece = null;
396 return res;
397 }
398
399 }