1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.block_app.framework.api.giraph;
1920import java.io.DataInput;
21import java.io.DataOutput;
22import java.io.IOException;
23import java.util.List;
2425import org.apache.giraph.block_app.framework.internal.BlockWorkerContextLogic;
26import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces;
27import org.apache.giraph.block_app.framework.output.BlockOutputHandle;
28import org.apache.giraph.worker.WorkerContext;
29import org.apache.giraph.writable.kryo.HadoopKryo;
30import org.apache.giraph.writable.kryo.markers.KryoIgnoreWritable;
31import org.apache.hadoop.io.Writable;
32import org.apache.hadoop.io.WritableComparable;
33import org.apache.log4j.Logger;
3435/**36 * WorkerContext that executes receiver and sender blocks passed37 * into BlockWorkerPieces.38 */39publicfinalclassBlockWorkerContextextendsWorkerContext40implementsKryoIgnoreWritable {
41publicstaticfinal Logger LOG = Logger.getLogger(BlockWorkerContext.class);
4243privateBlockWorkerContextLogic workerLogic;
4445 @Override
46publicvoid preApplication()
47throws InstantiationException, IllegalAccessException {
48 workerLogic = newBlockWorkerContextLogic();
49 workerLogic.preApplication(new BlockWorkerContextApiWrapper<>(this),
50newBlockOutputHandle(getContext().getJobID().toString(),
51 getConf(), getContext()));
52 }
5354 @Override
55publicvoid preSuperstep() {
56 List<Writable> messages = getAndClearMessagesFromOtherWorkers();
57 BlockWorkerContextApiWrapper<WritableComparable, Writable> workerApi =
58new BlockWorkerContextApiWrapper<>(this);
59 BlockWorkerPieces<Object> workerPieces =
60 BlockWorkerPieces.getNextWorkerPieces(this);
6162 LOG.info("PassedComputation in " + getSuperstep() +
63" superstep executing " + workerPieces);
6465 workerLogic.preSuperstep(
66 workerApi, workerApi, workerPieces, getSuperstep(), messages);
67 }
6869 @Override
70publicvoid postSuperstep() {
71 workerLogic.postSuperstep();
72 }
7374 @Override
75publicvoid postApplication() {
76 workerLogic.postApplication();
77 }
7879public Object getWorkerValue() {
80return workerLogic.getWorkerValue();
81 }
8283publicBlockOutputHandle getOutputHandle() {
84return workerLogic.getOutputHandle();
85 }
8687// Cannot extend KryoWritable directly, since WorkerContext is88// abstract class, not interface... Additionally conf in parent89// class cannot be made transient.90// So just add serialization of two individual fields.91// (and adding KryoIgnoreWritable to avoid wrapping it twice)9293 @Override
94publicvoid write(DataOutput out) throws IOException {
95 HadoopKryo.writeClassAndObj(out, workerLogic);
96 }
9798 @Override
99publicvoid readFields(DataInput in) throws IOException {
100 workerLogic = HadoopKryo.readClassAndObj(in);
101 workerLogic.getOutputHandle().initialize(getConf(), getContext());
102 }
103 }