1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.block_app.framework.api.giraph;
1920import java.io.DataInput;
21import java.io.DataOutput;
22import java.io.IOException;
2324import org.apache.giraph.block_app.framework.internal.BlockMasterLogic;
25import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces;
26import org.apache.giraph.block_app.framework.output.BlockOutputHandle;
27import org.apache.giraph.master.MasterCompute;
28import org.apache.giraph.writable.kryo.KryoWritableWrapper;
2930/**31 * MasterCompute class which executes block computation.32 *33 * @param <S> Execution stage type34 */35publicfinalclass BlockMasterCompute<S> extendsMasterCompute {
36private BlockMasterLogic<S> blockMasterLogic = new BlockMasterLogic<>();
3738 @Override
39publicvoid initialize() throws InstantiationException,
40 IllegalAccessException {
41 blockMasterLogic.initialize(getConf(), new BlockMasterApiWrapper(this,
42newBlockOutputHandle(getContext().getJobID().toString(),
43 getConf(), getContext())));
44 }
4546 @Override
47publicvoid compute() {
48 BlockWorkerPieces<S> workerPieces =
49 blockMasterLogic.computeNext(getSuperstep());
50if (workerPieces == null) {
51 haltComputation();
52 } else {
53 BlockWorkerPieces.setNextWorkerPieces(this, workerPieces);
54 }
55 }
5657 @Override
58publicvoid write(DataOutput out) throws IOException {
59new KryoWritableWrapper<>(blockMasterLogic).write(out);
60 }
6162 @Override
63publicvoid readFields(DataInput in) throws IOException {
64 KryoWritableWrapper<BlockMasterLogic<S>> object =
65new KryoWritableWrapper<>();
66 object.readFields(in);
67 blockMasterLogic = object.get();
68 blockMasterLogic.initializeAfterRead(newBlockMasterApiWrapper(this,
69newBlockOutputHandle()));
70 }
71 }