This project has retired. For details please refer to its Attic page.
BlockWorkerContext xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.api.giraph;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.IOException;
23  import java.util.List;
24  
25  import org.apache.giraph.block_app.framework.internal.BlockWorkerContextLogic;
26  import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces;
27  import org.apache.giraph.block_app.framework.output.BlockOutputHandle;
28  import org.apache.giraph.worker.WorkerContext;
29  import org.apache.giraph.writable.kryo.HadoopKryo;
30  import org.apache.giraph.writable.kryo.markers.KryoIgnoreWritable;
31  import org.apache.hadoop.io.Writable;
32  import org.apache.hadoop.io.WritableComparable;
33  import org.apache.log4j.Logger;
34  
35  /**
36   * WorkerContext that executes receiver and sender blocks passed
37   * into BlockWorkerPieces.
38   */
39  public final class BlockWorkerContext extends WorkerContext
40      implements KryoIgnoreWritable {
41    public static final Logger LOG = Logger.getLogger(BlockWorkerContext.class);
42  
43    private BlockWorkerContextLogic workerLogic;
44  
45    @Override
46    public void preApplication()
47        throws InstantiationException, IllegalAccessException {
48      workerLogic = new BlockWorkerContextLogic();
49      workerLogic.preApplication(new BlockWorkerContextApiWrapper<>(this),
50          new BlockOutputHandle(getContext().getJobID().toString(),
51              getConf(), getContext()));
52    }
53  
54    @Override
55    public void preSuperstep() {
56      List<Writable> messages = getAndClearMessagesFromOtherWorkers();
57      BlockWorkerContextApiWrapper<WritableComparable, Writable> workerApi =
58          new BlockWorkerContextApiWrapper<>(this);
59      BlockWorkerPieces<Object> workerPieces =
60          BlockWorkerPieces.getNextWorkerPieces(this);
61  
62      LOG.info("PassedComputation in " + getSuperstep() +
63          " superstep executing " + workerPieces);
64  
65      workerLogic.preSuperstep(
66          workerApi, workerApi, workerPieces, getSuperstep(), messages);
67    }
68  
69    @Override
70    public void postSuperstep() {
71      workerLogic.postSuperstep();
72    }
73  
74    @Override
75    public void postApplication() {
76      workerLogic.postApplication();
77    }
78  
79    public Object getWorkerValue() {
80      return workerLogic.getWorkerValue();
81    }
82  
83    public BlockOutputHandle getOutputHandle() {
84      return workerLogic.getOutputHandle();
85    }
86  
87    // Cannot extend KryoWritable directly, since WorkerContext is
88    // abstract class, not interface... Additionally conf in parent
89    // class cannot be made transient.
90    // So just add serialization of two individual fields.
91    // (and adding KryoIgnoreWritable to avoid wrapping it twice)
92  
93    @Override
94    public void write(DataOutput out) throws IOException {
95      HadoopKryo.writeClassAndObj(out, workerLogic);
96    }
97  
98    @Override
99    public void readFields(DataInput in) throws IOException {
100     workerLogic = HadoopKryo.readClassAndObj(in);
101     workerLogic.getOutputHandle().initialize(getConf(), getContext());
102   }
103 }