This project has retired. For details please refer to its Attic page.
BlockWorkerLogic xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.internal;
19  
20  import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
21  import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
22  import org.apache.giraph.block_app.framework.piece.AbstractPiece.InnerVertexSender;
23  import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
24  import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
25  import org.apache.giraph.graph.Vertex;
26  
27  /**
28   * Block execution logic on workers.
29   */
30  @SuppressWarnings({ "rawtypes", "unchecked" })
31  public class BlockWorkerLogic {
32    private final BlockWorkerPieces pieces;
33  
34    private transient VertexReceiver receiveFunctions;
35    private transient InnerVertexSender sendFunctions;
36  
37    public BlockWorkerLogic(BlockWorkerPieces pieces) {
38      this.pieces = pieces;
39    }
40  
41    public void preSuperstep(
42        BlockWorkerReceiveApi receiveApi, BlockWorkerSendApi sendApi) {
43      pieces.getBlockApiHandle().setWorkerReceiveApi(receiveApi);
44      pieces.getBlockApiHandle().setWorkerSendApi(sendApi);
45      if (pieces.getReceiver() != null) {
46        receiveFunctions = pieces.getReceiver().getVertexReceiver(receiveApi);
47      }
48      if (pieces.getSender() != null) {
49        sendFunctions = pieces.getSender().getVertexSender(sendApi);
50      }
51    }
52  
53    public void compute(Vertex vertex, Iterable messages) {
54      if (receiveFunctions != null) {
55        receiveFunctions.vertexReceive(vertex, messages);
56      }
57      if (sendFunctions != null) {
58        sendFunctions.vertexSend(vertex);
59      }
60    }
61  
62    public void postSuperstep() {
63      if (receiveFunctions instanceof VertexPostprocessor) {
64        ((VertexPostprocessor) receiveFunctions).postprocess();
65      }
66      if (sendFunctions != null) {
67        sendFunctions.postprocess();
68      }
69    }
70  }