This project has retired. For details please refer to its Attic page.
BlockWorkerContextLogic xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.internal;
19  
20  import java.util.List;
21  
22  import org.apache.giraph.block_app.framework.BlockUtils;
23  import org.apache.giraph.block_app.framework.api.BlockWorkerContextApi;
24  import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
25  import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
26  import org.apache.giraph.block_app.framework.output.BlockOutputHandle;
27  import org.apache.hadoop.io.Writable;
28  import org.apache.log4j.Logger;
29  
30  /**
31   * Block execution logic on WorkerContext.
32   */
33  @SuppressWarnings({ "rawtypes" })
34  public class BlockWorkerContextLogic {
35    public static final Logger LOG =
36        Logger.getLogger(BlockWorkerContextLogic.class);
37  
38    private Object workerValue;
39    private BlockWorkerPieces workerPieces;
40    private BlockOutputHandle outputHandle;
41  
42    private transient BlockWorkerContextSendApi sendApi;
43  
44    public BlockWorkerContextLogic() {
45    }
46  
47    public void preApplication(BlockWorkerContextApi api,
48        BlockOutputHandle outputHandle) {
49      workerValue =
50          BlockUtils.BLOCK_WORKER_CONTEXT_VALUE_CLASS.newInstance(api.getConf());
51      this.outputHandle = outputHandle;
52    }
53  
54    public Object getWorkerValue() {
55      return workerValue;
56    }
57  
58    public BlockOutputHandle getOutputHandle() {
59      return outputHandle;
60    }
61  
62    @SuppressWarnings("unchecked")
63    public void preSuperstep(
64        BlockWorkerContextReceiveApi receiveApi,
65        BlockWorkerContextSendApi sendApi,
66        BlockWorkerPieces workerPieces, long superstep,
67        List<Writable> messages) {
68      workerPieces.getBlockApiHandle().setWorkerContextReceiveApi(receiveApi);
69      workerPieces.getBlockApiHandle().setWorkerContextSendApi(sendApi);
70      if (BlockUtils.LOG_EXECUTION_STATUS.get(receiveApi.getConf())) {
71        LOG.info("Worker executing " + workerPieces + " in " + superstep +
72            " superstep");
73      }
74      this.sendApi = sendApi;
75      this.workerPieces = workerPieces;
76      if (workerPieces.getReceiver() != null) {
77        workerPieces.getReceiver().workerContextReceive(
78            receiveApi, workerValue, messages);
79      }
80    }
81  
82    public void postSuperstep() {
83      if (workerPieces.getSender() != null) {
84        workerPieces.getSender().workerContextSend(sendApi, workerValue);
85      }
86      workerPieces = null;
87      sendApi = null;
88      outputHandle.returnAllWriters();
89    }
90  
91    public void postApplication() {
92      outputHandle.closeAllWriters();
93      // TODO add support through conf for postApplication, if needed.
94    }
95  }