This project has retired. For details please refer to its Attic page.
InternalApi xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.api.local;
19  
20  import static com.google.common.base.Preconditions.checkState;
21  
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.Iterator;
25  import java.util.LinkedList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Queue;
29  import java.util.concurrent.ConcurrentHashMap;
30  import java.util.concurrent.ThreadLocalRandom;
31  
32  import org.apache.giraph.aggregators.Aggregator;
33  import org.apache.giraph.block_app.framework.api.BlockMasterApi;
34  import org.apache.giraph.block_app.framework.api.BlockOutputHandleAccessor;
35  import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
36  import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
37  import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
38  import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
39  import org.apache.giraph.block_app.framework.api.BlockWorkerValueAccessor;
40  import org.apache.giraph.block_app.framework.api.Counter;
41  import org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalChecksMessageStore;
42  import org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalWrappedMessageStore;
43  import org.apache.giraph.block_app.framework.internal.BlockCounters;
44  import org.apache.giraph.block_app.framework.internal.BlockWorkerContextLogic;
45  import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces;
46  import org.apache.giraph.block_app.framework.output.BlockOutputDesc;
47  import org.apache.giraph.block_app.framework.output.BlockOutputHandle;
48  import org.apache.giraph.block_app.framework.output.BlockOutputWriter;
49  import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
50  import org.apache.giraph.block_app.framework.piece.global_comm.internal.ReducersForPieceHandler.BroadcastHandleImpl;
51  import org.apache.giraph.comm.SendMessageCache.TargetVertexIdIterator;
52  import org.apache.giraph.comm.messages.PartitionSplitInfo;
53  import org.apache.giraph.conf.GiraphConstants;
54  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
55  import org.apache.giraph.conf.MessageClasses;
56  import org.apache.giraph.edge.Edge;
57  import org.apache.giraph.edge.OutEdges;
58  import org.apache.giraph.graph.Vertex;
59  import org.apache.giraph.graph.VertexMutations;
60  import org.apache.giraph.graph.VertexResolver;
61  import org.apache.giraph.master.AggregatorToGlobalCommTranslation;
62  import org.apache.giraph.partition.GraphPartitionerFactory;
63  import org.apache.giraph.partition.Partition;
64  import org.apache.giraph.reducers.ReduceOperation;
65  import org.apache.giraph.utils.TestGraph;
66  import org.apache.giraph.utils.WritableUtils;
67  import org.apache.giraph.worker.WorkerAggregatorDelegator;
68  import org.apache.giraph.worker.WorkerGlobalCommUsage;
69  import org.apache.hadoop.io.Writable;
70  import org.apache.hadoop.io.WritableComparable;
71  
72  import com.google.common.base.Preconditions;
73  
74  import it.unimi.dsi.fastutil.ints.IntArrayList;
75  import it.unimi.dsi.fastutil.ints.IntList;
76  
77  /**
78   * Internal implementation of Block API interfaces - representing an in-memory
79   * giraph instance.
80   *
81   * @param <I> Vertex id type
82   * @param <V> Vertex value type
83   * @param <E> Edge value type
84   */
85  @SuppressWarnings({ "rawtypes", "unchecked" })
86  class InternalApi<I extends WritableComparable, V extends Writable,
87      E extends Writable> implements BlockMasterApi, BlockOutputHandleAccessor {
88    private final TestGraph<I, V, E> inputGraph;
89    private final List<Partition<I, V, E>> partitions;
90    private final GraphPartitionerFactory<I, V, E> partitionerFactory;
91  
92    private final ImmutableClassesGiraphConfiguration conf;
93    private final boolean runAllChecks;
94    private final InternalAggregators globalComm;
95    private final AggregatorToGlobalCommTranslation aggregators;
96  
97    private final boolean createVertexOnMsgs;
98    private final ConcurrentHashMap<I, VertexMutations<I, V, E>> mutations;
99  
100   private InternalMessageStore previousMessages;
101   private InternalMessageStore nextMessages;
102 
103   private MessageClasses previousMessageClasses;
104   private MessageClasses nextMessageClasses;
105 
106   private final InternalWorkerApi workerApi;
107   private final BlockWorkerContextLogic workerContextLogic;
108   private List<Writable> previousWorkerMessages;
109   private List<Writable> nextWorkerMessages;
110 
111   public InternalApi(
112       TestGraph<I, V, E> graph,
113       ImmutableClassesGiraphConfiguration conf,
114       int numPartitions,
115       boolean runAllChecks) {
116     this.inputGraph = graph;
117     this.partitions = new ArrayList<>(numPartitions);
118     for (int i = 0; i < numPartitions; i++) {
119       this.partitions.add(conf.createPartition(i, null));
120     }
121     this.partitionerFactory = conf.createGraphPartitioner();
122     Preconditions.checkNotNull(this.partitionerFactory);
123     Preconditions.checkState(this.partitions.size() == numPartitions);
124 
125     for (Vertex<I, V, E> vertex : graph) {
126       getPartition(vertex.getId()).putVertex(vertex);
127     }
128     graph.clear();
129 
130     this.conf = conf;
131     this.runAllChecks = runAllChecks;
132     this.globalComm = new InternalAggregators(runAllChecks);
133     this.aggregators = new AggregatorToGlobalCommTranslation(conf, globalComm);
134     this.mutations = new ConcurrentHashMap<>();
135     this.workerApi = new InternalWorkerApi();
136     this.workerApi.setConf(conf);
137     this.workerApi.setWorkerGlobalCommUsage(this.globalComm);
138 
139     this.createVertexOnMsgs =
140         GiraphConstants.RESOLVER_CREATE_VERTEX_ON_MSGS.get(conf);
141     workerContextLogic = new BlockWorkerContextLogic();
142   }
143 
144   /**
145    * Wrapper for calling Worker API interface.
146    * Needs to be separate from Master API, since getAggregatedValue
147    * has different implementation on worker and on master.
148    */
149   class InternalWorkerApi extends WorkerAggregatorDelegator<I, V, E>
150       implements BlockWorkerSendApi<I, V, E, Writable>,
151       BlockWorkerReceiveApi<I>, BlockWorkerContextSendApi<I, Writable>,
152       BlockWorkerContextReceiveApi<I>, BlockWorkerValueAccessor,
153       WorkerGlobalCommUsage {
154 
155     @Override
156     public void addVertexRequest(I id, V value) {
157       addVertexRequest(id, value, conf.createAndInitializeOutEdges());
158     }
159 
160     @Override
161     public void addVertexRequest(I id, V value, OutEdges<I, E> edges) {
162       Vertex<I, V, E> vertex = conf.createVertex();
163       vertex.initialize(id, value, edges);
164       getMutationFor(id).addVertex(vertex);
165     }
166 
167     @Override
168     public void removeVertexRequest(I vertexId) {
169       getMutationFor(vertexId).removeVertex();
170     }
171 
172     @Override
173     public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge) {
174       getMutationFor(sourceVertexId).addEdge(edge);
175     }
176 
177     @Override
178     public void removeEdgesRequest(I sourceVertexId, I targetVertexId) {
179       getMutationFor(sourceVertexId).removeEdge(targetVertexId);
180     }
181 
182     @Override
183     public void sendMessage(I id, Writable message) {
184       nextMessages.sendMessage(id, message);
185     }
186 
187     @Override
188     public void sendMessageToAllEdges(
189         Vertex<I, V, E> vertex, Writable message) {
190       sendMessageToMultipleEdges(
191           new TargetVertexIdIterator<>(vertex),
192           message);
193     }
194 
195     @Override
196     public void sendMessageToMultipleEdges(
197         Iterator<I> vertexIdIterator, Writable message) {
198       nextMessages.sendMessageToMultipleEdges(vertexIdIterator, message);
199     }
200 
201     @Override
202     public int getMyWorkerIndex() {
203       return 0;
204     }
205 
206     @Override
207     public int getWorkerCount() {
208       return 1;
209     }
210 
211     @Override
212     public int getWorkerForVertex(I vertexId) {
213       return 0;
214     }
215 
216     @Override
217     public void sendMessageToWorker(Writable message, int workerIndex) {
218       Preconditions.checkArgument(workerIndex == getMyWorkerIndex(),
219           "With just one worker you can only send worker message to itself, " +
220               "but tried to send to " + workerIndex);
221       nextWorkerMessages.add(message);
222     }
223 
224     @Override
225     public Object getWorkerValue() {
226       return workerContextLogic.getWorkerValue();
227     }
228 
229     @Override
230     public long getTotalNumVertices() {
231       return InternalApi.this.getTotalNumVertices();
232     }
233 
234     @Override
235     public long getTotalNumEdges() {
236       return InternalApi.this.getTotalNumEdges();
237     }
238 
239     @Override
240     public <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>>
241     OD getOutputDesc(String confOption) {
242       return workerContextLogic.getOutputHandle().<OW, OD>getOutputDesc(
243           confOption);
244     }
245 
246     @Override
247     public <OW extends BlockOutputWriter> OW getWriter(String confOption) {
248       return workerContextLogic.getOutputHandle().getWriter(confOption);
249     }
250 
251     @Override
252     public void setStatus(String status) {
253     }
254 
255     @Override
256     public void progress() {
257     }
258 
259     @Override
260     public Counter getCounter(final String group, final String name) {
261       return BlockCounters.getNoOpCounter();
262     }
263   }
264 
265   @Override
266   public void broadcast(String name, Writable value) {
267     globalComm.broadcast(name, value);
268   }
269 
270   @Override
271   public <T extends Writable> BroadcastHandle<T> broadcast(T object) {
272     BroadcastHandleImpl<T> handle = new BroadcastHandleImpl<>();
273     broadcast(handle.getName(), object);
274     return handle;
275   }
276 
277   @Override
278   public <S, R extends Writable> void registerReducer(
279       String name, ReduceOperation<S, R> reduceOp) {
280     globalComm.registerReducer(name, reduceOp);
281   }
282 
283   @Override
284   public <S, R extends Writable> void registerReducer(
285       String name, ReduceOperation<S, R> reduceOp,
286       R globalInitialValue) {
287     globalComm.registerReducer(name, reduceOp, globalInitialValue);
288   }
289 
290   @Override
291   public <R extends Writable> R getReduced(String name) {
292     return globalComm.getReduced(name);
293   }
294 
295   @Override
296   public <A extends Writable> A getAggregatedValue(String name) {
297     return aggregators.getAggregatedValue(name);
298   }
299 
300   @Override
301   public <A extends Writable> void setAggregatedValue(String name, A value) {
302     aggregators.setAggregatedValue(name, value);
303   }
304 
305   @Override
306   public <A extends Writable>
307   boolean registerAggregator(
308       String name, Class<? extends Aggregator<A>> aggregatorClass)
309       throws InstantiationException, IllegalAccessException {
310     return aggregators.registerAggregator(name, aggregatorClass);
311   }
312 
313   @Override
314   public <A extends Writable>
315   boolean registerPersistentAggregator(
316       String name, Class<? extends Aggregator<A>> aggregatorClass)
317       throws InstantiationException, IllegalAccessException {
318     return aggregators.registerPersistentAggregator(name, aggregatorClass);
319   }
320 
321   @Override
322   public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
323     return conf;
324   }
325 
326   @Override
327   public void setStatus(String status) {
328   }
329 
330   @Override
331   public void progress() {
332   }
333 
334   @Override
335   public Counter getCounter(final String group, final String name) {
336     return BlockCounters.getNoOpCounter();
337   }
338 
339   private VertexMutations<I, V, E> getMutationFor(I vertexId) {
340     VertexMutations<I, V, E> curMutations = new VertexMutations<>();
341     VertexMutations<I, V, E> prevMutations =
342         mutations.putIfAbsent(vertexId, curMutations);
343     if (prevMutations != null) {
344       curMutations = prevMutations;
345     }
346     return curMutations;
347   }
348 
349   public Iterable takeMessages(I id) {
350     if (previousMessages != null) {
351       Iterable result = previousMessages.takeMessages(id);
352       if (result != null) {
353         return result;
354       }
355     }
356     return Collections.emptyList();
357   }
358 
359   public Iterable<I> getPartitionDestinationVertices(int partitionId) {
360     if (previousMessages != null) {
361       Iterable result =
362           previousMessages.getPartitionDestinationVertices(partitionId);
363       if (result != null) {
364         return result;
365       }
366     }
367     return Collections.emptyList();
368   }
369 
370   public List<Writable> takeWorkerMessages() {
371     if (previousWorkerMessages != null) {
372       List<Writable> ret = new ArrayList<>(previousWorkerMessages.size());
373       for (Writable message : previousWorkerMessages) {
374         // Use message copies probabilistically, to catch both not serializing
375         // some fields, and storing references from message object itself
376         // (which can be reusable).
377         ret.add(runAllChecks && ThreadLocalRandom.current().nextBoolean() ?
378             WritableUtils.createCopy(message) : message);
379       }
380       previousWorkerMessages = null;
381       if (runAllChecks) {
382         Collections.shuffle(ret);
383       }
384       return ret;
385     }
386     return Collections.emptyList();
387   }
388 
389   public void afterWorkerBeforeMaster() {
390     globalComm.afterWorkerBeforeMaster();
391     aggregators.prepareSuperstep();
392   }
393 
394   public void afterMasterBeforeWorker() {
395     aggregators.postMasterCompute();
396   }
397 
398   public void afterMasterBeforeWorker(BlockWorkerPieces computation) {
399     afterMasterBeforeWorker();
400 
401     previousMessages = nextMessages;
402     previousMessageClasses = nextMessageClasses;
403     previousWorkerMessages = nextWorkerMessages;
404 
405     nextMessageClasses = computation.getOutgoingMessageClasses(conf);
406     nextMessages = createMessageStore(
407       conf,
408       nextMessageClasses,
409       createPartitionInfo(),
410       runAllChecks
411     );
412     nextWorkerMessages = new ArrayList<>();
413 
414     // finalize previous messages
415     if (previousMessages != null) {
416       previousMessages.finalizeStore();
417     }
418 
419     boolean ignoreExistingVertices = ignoreExistingVertices();
420 
421     // process mutations:
422     VertexResolver<I, V, E> vertexResolver = conf.createVertexResolver();
423     for (Map.Entry<I, VertexMutations<I, V, E>> entry : mutations.entrySet()) {
424       I vertexIndex = entry.getKey();
425       Vertex<I, V, E> originalVertex =
426           getPartition(vertexIndex).getVertex(vertexIndex);
427       VertexMutations<I, V, E> curMutations = entry.getValue();
428       Vertex<I, V, E> vertex = vertexResolver.resolve(
429         vertexIndex,
430         originalVertex,
431         curMutations,
432         !ignoreExistingVertices && previousMessages != null &&
433         previousMessages.hasMessage(vertexIndex)
434       );
435 
436       if (vertex != null) {
437         getPartition(vertex.getId()).putVertex(vertex);
438       } else if (originalVertex != null) {
439         getPartition(originalVertex.getId()).removeVertex(
440             originalVertex.getId());
441         if (!ignoreExistingVertices && previousMessages != null) {
442           previousMessages.takeMessages(originalVertex.getId());
443         }
444       }
445     }
446     mutations.clear();
447 
448     if (!ignoreExistingVertices && createVertexOnMsgs &&
449         previousMessages != null) {
450       Iterator<I> iter = previousMessages.targetVertexIds();
451       while (iter.hasNext()) {
452         I target = iter.next();
453         if (getPartition(target).getVertex(target) == null) {
454           // need a copy as the key might be reusable
455           I copyId = WritableUtils.createCopy(target);
456 
457           Vertex<I, V, E> vertex =
458               vertexResolver.resolve(copyId, null, null, true);
459 
460           if (vertex != null) {
461             getPartition(vertex.getId()).putVertex(vertex);
462           }
463         }
464       }
465     }
466   }
467 
468   public boolean ignoreExistingVertices() {
469     return previousMessageClasses != null &&
470         previousMessageClasses.ignoreExistingVertices();
471   }
472 
473   private <M extends Writable>
474   InternalMessageStore<I, M> createMessageStore(
475     ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
476     MessageClasses<I, M> messageClasses,
477     PartitionSplitInfo<I> partitionInfo,
478     boolean runAllChecks
479   ) {
480     InternalMessageStore<I, M> messageStore =
481       InternalWrappedMessageStore.create(conf, messageClasses, partitionInfo);
482     if (runAllChecks) {
483       return new InternalChecksMessageStore<I, M>(
484           messageStore, conf, messageClasses.createMessageValueFactory(conf));
485     } else {
486       return messageStore;
487     }
488   }
489 
490   private PartitionSplitInfo<I> createPartitionInfo() {
491     return new PartitionSplitInfo<I>() {
492       /** Ids of partitions */
493       private IntList partitionIds;
494       /** Queue of partitions to be precessed in a superstep */
495       private Queue<Partition<I, V, E>> partitionQueue;
496 
497       @Override
498       public int getPartitionId(I vertexId) {
499         return partitionerFactory.getPartition(vertexId, partitions.size(), 1);
500       }
501 
502       @Override
503       public Iterable<Integer> getPartitionIds() {
504         if (partitionIds == null) {
505           partitionIds = new IntArrayList(partitions.size());
506           for (int i = 0; i < partitions.size(); i++) {
507             partitionIds.add(i);
508           }
509         }
510         Preconditions.checkState(partitionIds.size() == partitions.size());
511         return partitionIds;
512       }
513 
514       @Override
515       public long getPartitionVertexCount(Integer partitionId) {
516         return partitions.get(partitionId).getVertexCount();
517       }
518 
519       @Override
520       public void startIteration() {
521         checkState(partitionQueue == null || partitionQueue.isEmpty(),
522           "startIteration: It seems that some of " +
523           "of the partitions from previous iteration over partition store are" +
524           " not yet processed.");
525 
526         partitionQueue = new LinkedList<Partition<I, V, E>>();
527         for (Partition<I, V, E> partition : partitions) {
528           partitionQueue.add(partition);
529         }
530       }
531 
532       @Override
533       public Partition getNextPartition() {
534         return partitionQueue.poll();
535       }
536 
537       @Override
538       public void putPartition(Partition partition) {
539       }
540     };
541   }
542 
543   public List<Partition<I, V, E>> getPartitions() {
544     return partitions;
545   }
546 
547   public InternalWorkerApi getWorkerApi() {
548     return workerApi;
549   }
550 
551   @Override
552   public long getTotalNumEdges() {
553     int numEdges = 0;
554     for (Partition<I, V, E> partition : partitions) {
555       numEdges += partition.getEdgeCount();
556     }
557     return numEdges;
558   }
559 
560   @Override
561   public long getTotalNumVertices() {
562     int numVertices = 0;
563     for (Partition<I, V, E> partition : partitions) {
564       numVertices += partition.getVertexCount();
565     }
566     return numVertices;
567   }
568 
569   @Override
570   public void logToCommandLine(String line) {
571     System.err.println("Command line: " + line);
572   }
573 
574   @Override
575   public BlockOutputHandle getBlockOutputHandle() {
576     return workerContextLogic.getOutputHandle();
577   }
578 
579   @Override
580   public <OW extends BlockOutputWriter,
581       OD extends BlockOutputDesc<OW>> OD getOutputDesc(String confOption) {
582     return workerContextLogic.getOutputHandle().<OW, OD>getOutputDesc(
583         confOption);
584   }
585 
586   @Override
587   public <OW extends BlockOutputWriter> OW getWriter(String confOption) {
588     return workerContextLogic.getOutputHandle().getWriter(confOption);
589   }
590 
591   public BlockWorkerContextLogic getWorkerContextLogic() {
592     return workerContextLogic;
593   }
594 
595   @Override
596   public int getWorkerCount() {
597     return 1;
598   }
599 
600   private int getPartitionId(I id) {
601     Preconditions.checkNotNull(id);
602     return partitionerFactory.getPartition(id, partitions.size(), 1);
603   }
604 
605   private Partition<I, V, E> getPartition(I id) {
606     return partitions.get(getPartitionId(id));
607   }
608 
609   public void postApplication() {
610     for (Partition<I, V, E> partition : partitions) {
611       for (Vertex<I, V, E> vertex : partition) {
612         inputGraph.setVertex(vertex);
613       }
614     }
615   }
616 }