View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.api.local;
19  
20  import java.util.ArrayList;
21  import java.util.Collections;
22  import java.util.Iterator;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Set;
26  import java.util.concurrent.ConcurrentHashMap;
27  import java.util.concurrent.ThreadLocalRandom;
28  
29  import org.apache.giraph.aggregators.Aggregator;
30  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
31  import org.apache.giraph.block_app.framework.api.BlockOutputHandleAccessor;
32  import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
33  import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
34  import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
35  import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
36  import org.apache.giraph.block_app.framework.api.BlockWorkerValueAccessor;
37  import org.apache.giraph.block_app.framework.api.Counter;
38  import org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalConcurrentMessageStore;
39  import org.apache.giraph.block_app.framework.internal.BlockWorkerContextLogic;
40  import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces;
41  import org.apache.giraph.block_app.framework.output.BlockOutputDesc;
42  import org.apache.giraph.block_app.framework.output.BlockOutputHandle;
43  import org.apache.giraph.block_app.framework.output.BlockOutputWriter;
44  import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
45  import org.apache.giraph.block_app.framework.piece.global_comm.internal.ReducersForPieceHandler.BroadcastHandleImpl;
46  import org.apache.giraph.comm.SendMessageCache.TargetVertexIdIterator;
47  import org.apache.giraph.conf.GiraphConstants;
48  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
49  import org.apache.giraph.edge.Edge;
50  import org.apache.giraph.edge.OutEdges;
51  import org.apache.giraph.graph.Vertex;
52  import org.apache.giraph.graph.VertexMutations;
53  import org.apache.giraph.graph.VertexResolver;
54  import org.apache.giraph.master.AggregatorToGlobalCommTranslation;
55  import org.apache.giraph.partition.GraphPartitionerFactory;
56  import org.apache.giraph.partition.Partition;
57  import org.apache.giraph.reducers.ReduceOperation;
58  import org.apache.giraph.utils.TestGraph;
59  import org.apache.giraph.utils.WritableUtils;
60  import org.apache.giraph.worker.WorkerAggregatorDelegator;
61  import org.apache.giraph.worker.WorkerGlobalCommUsage;
62  import org.apache.hadoop.io.Writable;
63  import org.apache.hadoop.io.WritableComparable;
64  
65  import com.google.common.base.Preconditions;
66  
67  /**
68   * Internal implementation of Block API interfaces - representing an in-memory
69   * giraph instance.
70   *
71   * @param <I> Vertex id type
72   * @param <V> Vertex value type
73   * @param <E> Edge value type
74   */
75  @SuppressWarnings({ "rawtypes", "unchecked" })
76  class InternalApi<I extends WritableComparable, V extends Writable,
77      E extends Writable> implements BlockMasterApi, BlockOutputHandleAccessor {
78    private final TestGraph<I, V, E> inputGraph;
79    private final List<Partition<I, V, E>> partitions;
80    private final GraphPartitionerFactory<I, V, E> partitionerFactory;
81  
82    private final ImmutableClassesGiraphConfiguration conf;
83    private final boolean runAllChecks;
84    private final InternalAggregators globalComm;
85    private final AggregatorToGlobalCommTranslation aggregators;
86  
87    private final boolean createVertexOnMsgs;
88    private final ConcurrentHashMap<I, VertexMutations<I, V, E>> mutations;
89  
90    private InternalMessageStore previousMessages;
91    private InternalMessageStore nextMessages;
92  
93    private final InternalWorkerApi workerApi;
94    private final BlockWorkerContextLogic workerContextLogic;
95    private List<Writable> previousWorkerMessages;
96    private List<Writable> nextWorkerMessages;
97  
98    public InternalApi(
99        TestGraph<I, V, E> graph,
100       ImmutableClassesGiraphConfiguration conf,
101       int numPartitions,
102       boolean runAllChecks) {
103     this.inputGraph = graph;
104     this.partitions = new ArrayList<>(numPartitions);
105     for (int i = 0; i < numPartitions; i++) {
106       this.partitions.add(conf.createPartition(i, null));
107     }
108     this.partitionerFactory = conf.createGraphPartitioner();
109     Preconditions.checkNotNull(this.partitionerFactory);
110     Preconditions.checkState(this.partitions.size() == numPartitions);
111 
112     for (Vertex<I, V, E> vertex : graph) {
113       getPartition(vertex.getId()).putVertex(vertex);
114     }
115     graph.clear();
116 
117     this.conf = conf;
118     this.runAllChecks = runAllChecks;
119     this.globalComm = new InternalAggregators(runAllChecks);
120     this.aggregators = new AggregatorToGlobalCommTranslation(conf, globalComm);
121     this.mutations = new ConcurrentHashMap<>();
122     this.workerApi = new InternalWorkerApi();
123     this.workerApi.setConf(conf);
124     this.workerApi.setWorkerGlobalCommUsage(this.globalComm);
125 
126     this.createVertexOnMsgs =
127         GiraphConstants.RESOLVER_CREATE_VERTEX_ON_MSGS.get(conf);
128     workerContextLogic = new BlockWorkerContextLogic();
129   }
130 
131   /**
132    * Wrapper for calling Worker API interface.
133    * Needs to be separate from Master API, since getAggregatedValue
134    * has different implementation on worker and on master.
135    */
136   class InternalWorkerApi extends WorkerAggregatorDelegator<I, V, E>
137       implements BlockWorkerSendApi<I, V, E, Writable>,
138       BlockWorkerReceiveApi<I>, BlockWorkerContextSendApi<I, Writable>,
139       BlockWorkerContextReceiveApi<I>, BlockWorkerValueAccessor,
140       WorkerGlobalCommUsage {
141 
142     @Override
143     public void addVertexRequest(I id, V value) {
144       addVertexRequest(id, value, conf.createAndInitializeOutEdges());
145     }
146 
147     @Override
148     public void addVertexRequest(I id, V value, OutEdges<I, E> edges) {
149       Vertex<I, V, E> vertex = conf.createVertex();
150       vertex.initialize(id, value, edges);
151       getMutationFor(id).addVertex(vertex);
152     }
153 
154     @Override
155     public void removeVertexRequest(I vertexId) {
156       getMutationFor(vertexId).removeVertex();
157     }
158 
159     @Override
160     public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge) {
161       getMutationFor(sourceVertexId).addEdge(edge);
162     }
163 
164     @Override
165     public void removeEdgesRequest(I sourceVertexId, I targetVertexId) {
166       getMutationFor(sourceVertexId).removeEdge(targetVertexId);
167     }
168 
169     @Override
170     public void sendMessage(I id, Writable message) {
171       nextMessages.sendMessage(id, message);
172     }
173 
174     @Override
175     public void sendMessageToAllEdges(
176         Vertex<I, V, E> vertex, Writable message) {
177       sendMessageToMultipleEdges(
178           new TargetVertexIdIterator<>(vertex),
179           message);
180     }
181 
182     @Override
183     public void sendMessageToMultipleEdges(
184         Iterator<I> vertexIdIterator, Writable message) {
185       nextMessages.sendMessageToMultipleEdges(vertexIdIterator, message);
186     }
187 
188     @Override
189     public int getMyWorkerIndex() {
190       return 0;
191     }
192 
193     @Override
194     public int getWorkerCount() {
195       return 1;
196     }
197 
198     @Override
199     public int getWorkerForVertex(I vertexId) {
200       return 0;
201     }
202 
203     @Override
204     public void sendMessageToWorker(Writable message, int workerIndex) {
205       Preconditions.checkArgument(workerIndex == getMyWorkerIndex(),
206           "With just one worker you can only send worker message to itself, " +
207               "but tried to send to " + workerIndex);
208       nextWorkerMessages.add(message);
209     }
210 
211     @Override
212     public Object getWorkerValue() {
213       return workerContextLogic.getWorkerValue();
214     }
215 
216     @Override
217     public long getTotalNumVertices() {
218       return InternalApi.this.getTotalNumVertices();
219     }
220 
221     @Override
222     public long getTotalNumEdges() {
223       return InternalApi.this.getTotalNumEdges();
224     }
225 
226     @Override
227     public <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>>
228     OD getOutputDesc(String confOption) {
229       return workerContextLogic.getOutputHandle().<OW, OD>getOutputDesc(
230           confOption);
231     }
232 
233     @Override
234     public <OW extends BlockOutputWriter> OW getWriter(String confOption) {
235       return workerContextLogic.getOutputHandle().getWriter(confOption);
236     }
237   }
238 
239   @Override
240   public void broadcast(String name, Writable value) {
241     globalComm.broadcast(name, value);
242   }
243 
244   @Override
245   public <T extends Writable> BroadcastHandle<T> broadcast(T object) {
246     BroadcastHandleImpl<T> handle = new BroadcastHandleImpl<>();
247     broadcast(handle.getName(), object);
248     return handle;
249   }
250 
251   @Override
252   public <S, R extends Writable> void registerReducer(
253       String name, ReduceOperation<S, R> reduceOp) {
254     globalComm.registerReducer(name, reduceOp);
255   }
256 
257   @Override
258   public <S, R extends Writable> void registerReducer(
259       String name, ReduceOperation<S, R> reduceOp,
260       R globalInitialValue) {
261     globalComm.registerReducer(name, reduceOp, globalInitialValue);
262   }
263 
264   @Override
265   public <R extends Writable> R getReduced(String name) {
266     return globalComm.getReduced(name);
267   }
268 
269   @Override
270   public <A extends Writable> A getAggregatedValue(String name) {
271     return aggregators.getAggregatedValue(name);
272   }
273 
274   @Override
275   public <A extends Writable> void setAggregatedValue(String name, A value) {
276     aggregators.setAggregatedValue(name, value);
277   }
278 
279   @Override
280   public <A extends Writable>
281   boolean registerAggregator(
282       String name, Class<? extends Aggregator<A>> aggregatorClass)
283       throws InstantiationException, IllegalAccessException {
284     return aggregators.registerAggregator(name, aggregatorClass);
285   }
286 
287   @Override
288   public <A extends Writable>
289   boolean registerPersistentAggregator(
290       String name, Class<? extends Aggregator<A>> aggregatorClass)
291       throws InstantiationException, IllegalAccessException {
292     return aggregators.registerPersistentAggregator(name, aggregatorClass);
293   }
294 
295   @Override
296   public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
297     return conf;
298   }
299 
300   @Override
301   public void setStatus(String status) {
302   }
303 
304   @Override
305   public void progress() {
306   }
307 
308   @Override
309   public Counter getCounter(final String group, final String name) {
310     return new Counter() {
311       @Override
312       public void increment(long incr) {
313       }
314       @Override
315       public void setValue(long value) {
316       }
317     };
318   }
319 
320   private VertexMutations<I, V, E> getMutationFor(I vertexId) {
321     VertexMutations<I, V, E> curMutations = new VertexMutations<>();
322     VertexMutations<I, V, E> prevMutations =
323         mutations.putIfAbsent(vertexId, curMutations);
324     if (prevMutations != null) {
325       curMutations = prevMutations;
326     }
327     return curMutations;
328   }
329 
330   public Iterable takeMessages(I id) {
331     if (previousMessages != null) {
332       Iterable result = previousMessages.takeMessages(id);
333       if (result != null) {
334         return result;
335       }
336     }
337     return Collections.emptyList();
338   }
339 
340   public List<Writable> takeWorkerMessages() {
341     if (previousWorkerMessages != null) {
342       List<Writable> ret = new ArrayList<>(previousWorkerMessages.size());
343       for (Writable message : previousWorkerMessages) {
344         // Use message copies probabilistically, to catch both not serializing
345         // some fields, and storing references from message object itself
346         // (which can be reusable).
347         ret.add(runAllChecks && ThreadLocalRandom.current().nextBoolean() ?
348             WritableUtils.createCopy(message) : message);
349       }
350       previousWorkerMessages = null;
351       if (runAllChecks) {
352         Collections.shuffle(ret);
353       }
354       return ret;
355     }
356     return Collections.emptyList();
357   }
358 
359   public void afterWorkerBeforeMaster() {
360     globalComm.afterWorkerBeforeMaster();
361     aggregators.prepareSuperstep();
362   }
363 
364   public void afterMasterBeforeWorker() {
365     aggregators.postMasterCompute();
366   }
367 
368   public void afterMasterBeforeWorker(BlockWorkerPieces computation) {
369     afterMasterBeforeWorker();
370 
371     previousMessages = nextMessages;
372     previousWorkerMessages = nextWorkerMessages;
373 
374     nextMessages = InternalConcurrentMessageStore.createMessageStore(
375         conf, computation, runAllChecks);
376     nextWorkerMessages = new ArrayList<>();
377 
378     // process mutations:
379     Set<I> targets = previousMessages == null ?
380       Collections.EMPTY_SET : previousMessages.targetsSet();
381     if (createVertexOnMsgs) {
382       for (I target : targets) {
383         if (getPartition(target).getVertex(target) == null) {
384           mutations.putIfAbsent(target, new VertexMutations<I, V, E>());
385         }
386       }
387     }
388 
389     VertexResolver<I, V, E> vertexResolver = conf.createVertexResolver();
390     for (Map.Entry<I, VertexMutations<I, V, E>> entry : mutations.entrySet()) {
391       I vertexIndex = entry.getKey();
392       Vertex<I, V, E> originalVertex =
393           getPartition(vertexIndex).getVertex(vertexIndex);
394       VertexMutations<I, V, E> curMutations = entry.getValue();
395       Vertex<I, V, E> vertex = vertexResolver.resolve(
396           vertexIndex, originalVertex, curMutations,
397           targets.contains(vertexIndex));
398 
399       if (vertex != null) {
400         getPartition(vertex.getId()).putVertex(vertex);
401       } else if (originalVertex != null) {
402         getPartition(originalVertex.getId()).removeVertex(
403             originalVertex.getId());
404       }
405     }
406     mutations.clear();
407   }
408 
409   public List<Partition<I, V, E>> getPartitions() {
410     return partitions;
411   }
412 
413   public InternalWorkerApi getWorkerApi() {
414     return workerApi;
415   }
416 
417   @Override
418   public long getTotalNumEdges() {
419     int numEdges = 0;
420     for (Partition<I, V, E> partition : partitions) {
421       numEdges += partition.getEdgeCount();
422     }
423     return numEdges;
424   }
425 
426   @Override
427   public long getTotalNumVertices() {
428     int numVertices = 0;
429     for (Partition<I, V, E> partition : partitions) {
430       numVertices += partition.getVertexCount();
431     }
432     return numVertices;
433   }
434 
435   @Override
436   public void logToCommandLine(String line) {
437     System.err.println("Command line: " + line);
438   }
439 
440   @Override
441   public BlockOutputHandle getBlockOutputHandle() {
442     return workerContextLogic.getOutputHandle();
443   }
444 
445   @Override
446   public <OW extends BlockOutputWriter,
447       OD extends BlockOutputDesc<OW>> OD getOutputDesc(String confOption) {
448     return workerContextLogic.getOutputHandle().<OW, OD>getOutputDesc(
449         confOption);
450   }
451 
452   @Override
453   public <OW extends BlockOutputWriter> OW getWriter(String confOption) {
454     return workerContextLogic.getOutputHandle().getWriter(confOption);
455   }
456 
457   public BlockWorkerContextLogic getWorkerContextLogic() {
458     return workerContextLogic;
459   }
460 
461   @Override
462   public int getWorkerCount() {
463     return 1;
464   }
465 
466   private int getPartitionId(I id) {
467     Preconditions.checkNotNull(id);
468     return partitionerFactory.getPartition(id, partitions.size(), 1);
469   }
470 
471   private Partition<I, V, E> getPartition(I id) {
472     return partitions.get(getPartitionId(id));
473   }
474 
475   public void postApplication() {
476     for (Partition<I, V, E> partition : partitions) {
477       for (Vertex<I, V, E> vertex : partition) {
478         inputGraph.setVertex(vertex);
479       }
480     }
481   }
482 }