This project has retired. For details please refer to its Attic page.
BlockMasterApiWrapper xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.api.giraph;
19  
20  import org.apache.giraph.aggregators.Aggregator;
21  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
22  import org.apache.giraph.block_app.framework.api.BlockOutputApi;
23  import org.apache.giraph.block_app.framework.api.BlockOutputHandleAccessor;
24  import org.apache.giraph.block_app.framework.api.Counter;
25  import org.apache.giraph.block_app.framework.internal.BlockCounters;
26  import org.apache.giraph.block_app.framework.output.BlockOutputDesc;
27  import org.apache.giraph.block_app.framework.output.BlockOutputHandle;
28  import org.apache.giraph.block_app.framework.output.BlockOutputWriter;
29  import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
30  import org.apache.giraph.block_app.framework.piece.global_comm.internal.ReducersForPieceHandler.BroadcastHandleImpl;
31  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
32  import org.apache.giraph.master.MasterCompute;
33  import org.apache.giraph.reducers.ReduceOperation;
34  import org.apache.hadoop.io.Writable;
35  
36  /**
37   * Giraph implementation of BlockMasterApi, that delegates all calls
38   * to MasterCompute.
39   */
40  final class BlockMasterApiWrapper implements BlockMasterApi,
41      BlockOutputApi, BlockOutputHandleAccessor {
42    private final MasterCompute master;
43    private final BlockOutputHandle outputHandle;
44  
45    public BlockMasterApiWrapper(MasterCompute master,
46                                 BlockOutputHandle outputHandle) {
47      this.master = master;
48      this.outputHandle = outputHandle;
49    }
50  
51    @Override
52    public ImmutableClassesGiraphConfiguration<?, ?, ?> getConf() {
53      return master.getConf();
54    }
55  
56    @Override
57    public void setStatus(String status) {
58      master.getContext().setStatus(status);
59    }
60  
61    @Override
62    public void progress() {
63      master.getContext().progress();
64    }
65  
66    @Override
67    public Counter getCounter(String group, String name) {
68      return BlockCounters.getCounter(master.getContext(), group, name);
69    }
70  
71    @Override
72    public <R extends Writable> R getReduced(String name) {
73      return master.getReduced(name);
74    }
75  
76    @Override
77    public void broadcast(String name, Writable value) {
78      master.broadcast(name, value);
79    }
80  
81    @Override
82    public <S, R extends Writable> void registerReducer(
83        String name, ReduceOperation<S, R> reduceOp) {
84      master.registerReducer(name, reduceOp);
85    }
86  
87    @Override
88    public <S, R extends Writable> void registerReducer(
89        String name, ReduceOperation<S, R> reduceOp, R globalInitialValue) {
90      master.registerReducer(name, reduceOp, globalInitialValue);
91    }
92  
93    @Override
94    public <A extends Writable> A getAggregatedValue(String name) {
95      return master.getAggregatedValue(name);
96    }
97  
98    @Override
99    public <A extends Writable>
100   boolean registerAggregator(
101       String name, Class<? extends Aggregator<A>> aggregatorClass
102   ) throws InstantiationException, IllegalAccessException {
103     return master.registerAggregator(name, aggregatorClass);
104   }
105 
106   @Override
107   public <A extends Writable>
108   boolean registerPersistentAggregator(
109       String name, Class<? extends Aggregator<A>> aggregatorClass
110   ) throws InstantiationException,
111       IllegalAccessException {
112     return master.registerPersistentAggregator(name, aggregatorClass);
113   }
114 
115   @Override
116   public <A extends Writable> void setAggregatedValue(String name, A value) {
117     master.setAggregatedValue(name, value);
118   }
119 
120   @Override
121   public <T extends Writable> BroadcastHandle<T> broadcast(T object) {
122     BroadcastHandleImpl<T> handle = new BroadcastHandleImpl<>();
123     master.broadcast(handle.getName(), object);
124     return handle;
125   }
126 
127   @Override
128   @Deprecated
129   public long getTotalNumEdges() {
130     return master.getTotalNumEdges();
131   }
132 
133   @Override
134   @Deprecated
135   public long getTotalNumVertices() {
136     return master.getTotalNumVertices();
137   }
138 
139   @Override
140   public void logToCommandLine(String line) {
141     master.logToCommandLine(line);
142   }
143 
144   @Override
145   public <OW extends BlockOutputWriter,
146       OD extends BlockOutputDesc<OW>> OD getOutputDesc(String confOption) {
147     return outputHandle.<OW, OD>getOutputDesc(confOption);
148   }
149 
150   @Override
151   public <OW extends BlockOutputWriter> OW getWriter(String confOption) {
152     return outputHandle.getWriter(confOption);
153   }
154 
155   @Override
156   public BlockOutputHandle getBlockOutputHandle() {
157     return outputHandle;
158   }
159 
160   @Override
161   public int getWorkerCount() {
162     return master.getWorkerInfoList().size();
163   }
164 }