1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.block_app.framework.internal;
1920import java.util.List;
2122import org.apache.giraph.block_app.framework.BlockUtils;
23import org.apache.giraph.block_app.framework.api.BlockWorkerContextApi;
24import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
25import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
26import org.apache.giraph.block_app.framework.output.BlockOutputHandle;
27import org.apache.hadoop.io.Writable;
28import org.apache.log4j.Logger;
2930/**31 * Block execution logic on WorkerContext.32 */33 @SuppressWarnings({ "rawtypes" })
34publicclassBlockWorkerContextLogic {
35publicstaticfinal Logger LOG =
36 Logger.getLogger(BlockWorkerContextLogic.class);
3738private Object workerValue;
39privateBlockWorkerPieces workerPieces;
40privateBlockOutputHandle outputHandle;
4142privatetransientBlockWorkerContextSendApi sendApi;
4344publicBlockWorkerContextLogic() {
45 }
4647publicvoid preApplication(BlockWorkerContextApi api,
48BlockOutputHandle outputHandle) {
49 workerValue =
50 BlockUtils.BLOCK_WORKER_CONTEXT_VALUE_CLASS.newInstance(api.getConf());
51this.outputHandle = outputHandle;
52 }
5354public Object getWorkerValue() {
55return workerValue;
56 }
5758publicBlockOutputHandle getOutputHandle() {
59return outputHandle;
60 }
6162 @SuppressWarnings("unchecked")
63publicvoid preSuperstep(
64BlockWorkerContextReceiveApi receiveApi,
65BlockWorkerContextSendApi sendApi,
66BlockWorkerPieces workerPieces, long superstep,
67 List<Writable> messages) {
68 workerPieces.getBlockApiHandle().setWorkerContextReceiveApi(receiveApi);
69 workerPieces.getBlockApiHandle().setWorkerContextSendApi(sendApi);
70if (BlockUtils.LOG_EXECUTION_STATUS.get(receiveApi.getConf())) {
71 LOG.info("Worker executing " + workerPieces + " in " + superstep +
72" superstep");
73 }
74this.sendApi = sendApi;
75this.workerPieces = workerPieces;
76if (workerPieces.getReceiver() != null) {
77 workerPieces.getReceiver().workerContextReceive(
78 receiveApi, workerValue, messages);
79 }
80 }
8182publicvoid postSuperstep() {
83if (workerPieces.getSender() != null) {
84 workerPieces.getSender().workerContextSend(sendApi, workerValue);
85 }
86 workerPieces = null;
87 sendApi = null;
88 outputHandle.returnAllWriters();
89 }
9091publicvoid postApplication() {
92 outputHandle.closeAllWriters();
93// TODO add support through conf for postApplication, if needed.94 }
95 }