View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.api.giraph;
19  
20  import org.apache.giraph.aggregators.Aggregator;
21  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
22  import org.apache.giraph.block_app.framework.api.BlockOutputApi;
23  import org.apache.giraph.block_app.framework.api.BlockOutputHandleAccessor;
24  import org.apache.giraph.block_app.framework.api.Counter;
25  import org.apache.giraph.block_app.framework.output.BlockOutputDesc;
26  import org.apache.giraph.block_app.framework.output.BlockOutputHandle;
27  import org.apache.giraph.block_app.framework.output.BlockOutputWriter;
28  import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
29  import org.apache.giraph.block_app.framework.piece.global_comm.internal.ReducersForPieceHandler.BroadcastHandleImpl;
30  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
31  import org.apache.giraph.master.MasterCompute;
32  import org.apache.giraph.reducers.ReduceOperation;
33  import org.apache.hadoop.io.Writable;
34  
35  /**
36   * Giraph implementation of BlockMasterApi, that delegates all calls
37   * to MasterCompute.
38   */
39  final class BlockMasterApiWrapper implements BlockMasterApi,
40      BlockOutputApi, BlockOutputHandleAccessor {
41    private final MasterCompute master;
42    private final BlockOutputHandle outputHandle;
43  
44    public BlockMasterApiWrapper(MasterCompute master,
45                                 BlockOutputHandle outputHandle) {
46      this.master = master;
47      this.outputHandle = outputHandle;
48    }
49  
50    @Override
51    public ImmutableClassesGiraphConfiguration<?, ?, ?> getConf() {
52      return master.getConf();
53    }
54  
55    @Override
56    public void setStatus(String status) {
57      master.getContext().setStatus(status);
58    }
59  
60    @Override
61    public void progress() {
62      master.getContext().progress();
63    }
64  
65    @Override
66    public Counter getCounter(String group, String name) {
67      final org.apache.hadoop.mapreduce.Counter counter =
68          master.getContext().getCounter(group, name);
69      return new Counter() {
70        @Override
71        public void increment(long incr) {
72          counter.increment(incr);
73        }
74  
75        @Override
76        public void setValue(long value) {
77          counter.setValue(value);
78        }
79      };
80    }
81  
82    @Override
83    public <R extends Writable> R getReduced(String name) {
84      return master.getReduced(name);
85    }
86  
87    @Override
88    public void broadcast(String name, Writable value) {
89      master.broadcast(name, value);
90    }
91  
92    @Override
93    public <S, R extends Writable> void registerReducer(
94        String name, ReduceOperation<S, R> reduceOp) {
95      master.registerReducer(name, reduceOp);
96    }
97  
98    @Override
99    public <S, R extends Writable> void registerReducer(
100       String name, ReduceOperation<S, R> reduceOp, R globalInitialValue) {
101     master.registerReducer(name, reduceOp, globalInitialValue);
102   }
103 
104   @Override
105   public <A extends Writable> A getAggregatedValue(String name) {
106     return master.getAggregatedValue(name);
107   }
108 
109   @Override
110   public <A extends Writable>
111   boolean registerAggregator(
112       String name, Class<? extends Aggregator<A>> aggregatorClass
113   ) throws InstantiationException, IllegalAccessException {
114     return master.registerAggregator(name, aggregatorClass);
115   }
116 
117   @Override
118   public <A extends Writable>
119   boolean registerPersistentAggregator(
120       String name, Class<? extends Aggregator<A>> aggregatorClass
121   ) throws InstantiationException,
122       IllegalAccessException {
123     return master.registerPersistentAggregator(name, aggregatorClass);
124   }
125 
126   @Override
127   public <A extends Writable> void setAggregatedValue(String name, A value) {
128     master.setAggregatedValue(name, value);
129   }
130 
131   @Override
132   public <T extends Writable> BroadcastHandle<T> broadcast(T object) {
133     BroadcastHandleImpl<T> handle = new BroadcastHandleImpl<>();
134     master.broadcast(handle.getName(), object);
135     return handle;
136   }
137 
138   @Override
139   @Deprecated
140   public long getTotalNumEdges() {
141     return master.getTotalNumEdges();
142   }
143 
144   @Override
145   @Deprecated
146   public long getTotalNumVertices() {
147     return master.getTotalNumVertices();
148   }
149 
150   @Override
151   public void logToCommandLine(String line) {
152     master.logToCommandLine(line);
153   }
154 
155   @Override
156   public <OW extends BlockOutputWriter,
157       OD extends BlockOutputDesc<OW>> OD getOutputDesc(String confOption) {
158     return outputHandle.<OW, OD>getOutputDesc(confOption);
159   }
160 
161   @Override
162   public <OW extends BlockOutputWriter> OW getWriter(String confOption) {
163     return outputHandle.getWriter(confOption);
164   }
165 
166   @Override
167   public BlockOutputHandle getBlockOutputHandle() {
168     return outputHandle;
169   }
170 
171   @Override
172   public int getWorkerCount() {
173     return master.getWorkerInfoList().size();
174   }
175 }