View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm;
20  
21  import java.util.ArrayList;
22  import java.util.Collections;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.concurrent.ConcurrentMap;
26  
27  import com.google.common.collect.Iterables;
28  import com.google.common.collect.Maps;
29  
30  import org.apache.giraph.bsp.CentralizedServiceWorker;
31  import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
32  import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
33  import org.apache.giraph.comm.messages.MessageStore;
34  import org.apache.giraph.comm.messages.MessageStoreFactory;
35  import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
36  import org.apache.giraph.conf.GiraphConstants;
37  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
38  import org.apache.giraph.edge.EdgeStore;
39  import org.apache.giraph.edge.EdgeStoreFactory;
40  import org.apache.giraph.graph.Vertex;
41  import org.apache.giraph.graph.VertexMutations;
42  import org.apache.giraph.graph.VertexResolver;
43  import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
44  import org.apache.giraph.ooc.data.DiskBackedMessageStore;
45  import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
46  import org.apache.giraph.ooc.OutOfCoreEngine;
47  import org.apache.giraph.partition.Partition;
48  import org.apache.giraph.partition.PartitionStore;
49  import org.apache.giraph.partition.SimplePartitionStore;
50  import org.apache.giraph.utils.ReflectionUtils;
51  import org.apache.hadoop.io.Writable;
52  import org.apache.hadoop.io.WritableComparable;
53  import org.apache.hadoop.mapreduce.Mapper;
54  import org.apache.log4j.Logger;
55  
56  import static org.apache.giraph.conf.GiraphConstants.MESSAGE_STORE_FACTORY_CLASS;
57  
58  /**
59   * Anything that the server stores
60   *
61   * @param <I> Vertex id
62   * @param <V> Vertex data
63   * @param <E> Edge data
64   */
65  @SuppressWarnings("rawtypes")
66  public class ServerData<I extends WritableComparable,
67      V extends Writable, E extends Writable> {
68    /** Class logger */
69    private static final Logger LOG = Logger.getLogger(ServerData.class);
70    /** Configuration */
71    private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
72    /** Partition store for this worker. */
73    private volatile PartitionStore<I, V, E> partitionStore;
74    /** Edge store for this worker. */
75    private final EdgeStore<I, V, E> edgeStore;
76    /** Message store factory */
77    private final MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
78        messageStoreFactory;
79    /**
80     * Message store for incoming messages (messages which will be consumed
81     * in the next super step)
82     */
83    private volatile MessageStore<I, Writable> incomingMessageStore;
84    /**
85     * Message store for current messages (messages which we received in
86     * previous super step and which will be consumed in current super step)
87     */
88    private volatile MessageStore<I, Writable> currentMessageStore;
89    /**
90     * Map of partition ids to vertex mutations from other workers. These are
91     * mutations that should be applied before execution of *current* super step.
92     * (accesses to keys should be thread-safe as multiple threads may resolve
93     * mutations of different partitions at the same time)
94     */
95    private ConcurrentMap<Integer,
96        ConcurrentMap<I, VertexMutations<I, V, E>>>
97        oldPartitionMutations = Maps.newConcurrentMap();
98    /**
99     * Map of partition ids to vertex mutations from other workers. These are
100    * mutations that are coming from other workers as the execution goes one in a
101    * super step. These mutations should be applied in the *next* super step.
102    * (this should be thread-safe)
103    */
104   private ConcurrentMap<Integer,
105       ConcurrentMap<I, VertexMutations<I, V, E>>>
106       partitionMutations = Maps.newConcurrentMap();
107   /**
108    * Holds aggregators which current worker owns from current superstep
109    */
110   private final OwnerAggregatorServerData ownerAggregatorData;
111   /**
112    * Holds old aggregators from previous superstep
113    */
114   private final AllAggregatorServerData allAggregatorData;
115   /** Service worker */
116   private final CentralizedServiceWorker<I, V, E> serviceWorker;
117 
118   /** Store for current messages from other workers to this worker */
119   private volatile List<Writable> currentWorkerToWorkerMessages =
120       Collections.synchronizedList(new ArrayList<Writable>());
121   /** Store for message from other workers to this worker for next superstep */
122   private volatile List<Writable> incomingWorkerToWorkerMessages =
123       Collections.synchronizedList(new ArrayList<Writable>());
124 
125   /** Job context (for progress) */
126   private final Mapper<?, ?, ?, ?>.Context context;
127   /** Out-of-core engine */
128   private final OutOfCoreEngine oocEngine;
129 
130   /**
131    * Constructor.
132    *
133    * @param service Service worker
134    * @param conf Configuration
135    * @param context Mapper context
136    */
137   public ServerData(
138       CentralizedServiceWorker<I, V, E> service,
139       ImmutableClassesGiraphConfiguration<I, V, E> conf,
140       Mapper<?, ?, ?, ?>.Context context) {
141     this.serviceWorker = service;
142     this.conf = conf;
143     this.messageStoreFactory = createMessageStoreFactory();
144     EdgeStoreFactory<I, V, E> edgeStoreFactory = conf.createEdgeStoreFactory();
145     edgeStoreFactory.initialize(service, conf, context);
146     EdgeStore<I, V, E> inMemoryEdgeStore = edgeStoreFactory.newStore();
147     PartitionStore<I, V, E> inMemoryPartitionStore =
148         new SimplePartitionStore<I, V, E>(conf, context);
149     if (GiraphConstants.USE_OUT_OF_CORE_GRAPH.get(conf)) {
150       oocEngine = new OutOfCoreEngine(conf, service);
151       partitionStore =
152           new DiskBackedPartitionStore<I, V, E>(inMemoryPartitionStore,
153               conf, context, oocEngine);
154       edgeStore =
155           new DiskBackedEdgeStore<I, V, E>(inMemoryEdgeStore, conf, oocEngine);
156     } else {
157       partitionStore = inMemoryPartitionStore;
158       edgeStore = inMemoryEdgeStore;
159       oocEngine = null;
160     }
161     ownerAggregatorData = new OwnerAggregatorServerData(context);
162     allAggregatorData = new AllAggregatorServerData(context, conf);
163     this.context = context;
164   }
165 
166   /**
167    * Decide which message store should be used for current application,
168    * and create the factory for that store
169    *
170    * @return Message store factory
171    */
172   private MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
173   createMessageStoreFactory() {
174     Class<? extends MessageStoreFactory> messageStoreFactoryClass =
175         MESSAGE_STORE_FACTORY_CLASS.get(conf);
176 
177     MessageStoreFactory messageStoreFactoryInstance =
178         ReflectionUtils.newInstance(messageStoreFactoryClass);
179     messageStoreFactoryInstance.initialize(serviceWorker, conf);
180 
181     return messageStoreFactoryInstance;
182   }
183 
184   /**
185    * Return the out-of-core engine for this worker.
186    *
187    * @return The out-of-core engine
188    */
189   public OutOfCoreEngine getOocEngine() {
190     return oocEngine;
191   }
192 
193   /**
194    * Return the edge store for this worker.
195    *
196    * @return The edge store
197    */
198   public EdgeStore<I, V, E> getEdgeStore() {
199     return edgeStore;
200   }
201 
202   /**
203    * Return the partition store for this worker.
204    *
205    * @return The partition store
206    */
207   public PartitionStore<I, V, E> getPartitionStore() {
208     return partitionStore;
209   }
210 
211   /**
212    * Get message store for incoming messages (messages which will be consumed
213    * in the next super step)
214    *
215    * @param <M> Message data
216    * @return Incoming message store
217    */
218   public <M extends Writable> MessageStore<I, M> getIncomingMessageStore() {
219     return (MessageStore<I, M>) incomingMessageStore;
220   }
221 
222   /**
223    * Get message store for current messages (messages which we received in
224    * previous super step and which will be consumed in current super step)
225    *
226    * @param <M> Message data
227    * @return Current message store
228    */
229   public <M extends Writable> MessageStore<I, M> getCurrentMessageStore() {
230     return (MessageStore<I, M>) currentMessageStore;
231   }
232 
233   /**
234    * Re-initialize message stores.
235    * Discards old values if any.
236    */
237   public void resetMessageStores() {
238     if (currentMessageStore != null) {
239       currentMessageStore.clearAll();
240       currentMessageStore = null;
241     }
242     if (incomingMessageStore != null) {
243       incomingMessageStore.clearAll();
244       incomingMessageStore = null;
245     }
246     prepareSuperstep();
247   }
248 
249   /** Prepare for next superstep */
250   public void prepareSuperstep() {
251     if (currentMessageStore != null) {
252       currentMessageStore.clearAll();
253     }
254 
255     MessageStore<I, Writable> nextCurrentMessageStore;
256     MessageStore<I, Writable> nextIncomingMessageStore;
257     MessageStore<I, Writable> messageStore;
258 
259     // First create the necessary in-memory message stores. If out-of-core
260     // mechanism is enabled, we wrap the in-memory message stores within
261     // disk-backed messages stores.
262     if (incomingMessageStore != null) {
263       nextCurrentMessageStore = incomingMessageStore;
264     } else {
265       messageStore = messageStoreFactory.newStore(
266           conf.getIncomingMessageClasses());
267       if (oocEngine == null) {
268         nextCurrentMessageStore = messageStore;
269       } else {
270         nextCurrentMessageStore = new DiskBackedMessageStore<>(
271             conf, oocEngine, messageStore,
272             conf.getIncomingMessageClasses().useMessageCombiner(),
273             serviceWorker.getSuperstep());
274       }
275     }
276 
277     messageStore = messageStoreFactory.newStore(
278         conf.getOutgoingMessageClasses());
279     if (oocEngine == null) {
280       nextIncomingMessageStore = messageStore;
281     } else {
282       nextIncomingMessageStore = new DiskBackedMessageStore<>(
283           conf, oocEngine, messageStore,
284           conf.getOutgoingMessageClasses().useMessageCombiner(),
285           serviceWorker.getSuperstep() + 1);
286     }
287 
288     // If out-of-core engine is enabled, we avoid overlapping of out-of-core
289     // decisions with change of superstep. This avoidance is done to simplify
290     // the design and reduce excessive use of synchronization primitives.
291     if (oocEngine != null) {
292       oocEngine.getSuperstepLock().writeLock().lock();
293     }
294     currentMessageStore = nextCurrentMessageStore;
295     incomingMessageStore = nextIncomingMessageStore;
296     if (oocEngine != null) {
297       oocEngine.reset();
298       oocEngine.getSuperstepLock().writeLock().unlock();
299     }
300     currentMessageStore.finalizeStore();
301 
302     currentWorkerToWorkerMessages = incomingWorkerToWorkerMessages;
303     incomingWorkerToWorkerMessages =
304         Collections.synchronizedList(new ArrayList<Writable>());
305   }
306 
307   /**
308    * Get the vertex mutations (synchronize on the values)
309    *
310    * @return Vertex mutations
311    */
312   public ConcurrentMap<Integer, ConcurrentMap<I, VertexMutations<I, V, E>>>
313   getPartitionMutations() {
314     return partitionMutations;
315   }
316 
317   /**
318    * Get holder for aggregators which current worker owns
319    *
320    * @return Holder for aggregators which current worker owns
321    */
322   public OwnerAggregatorServerData getOwnerAggregatorData() {
323     return ownerAggregatorData;
324   }
325 
326   /**
327    * Get holder for aggregators from previous superstep
328    *
329    * @return Holder for aggregators from previous superstep
330    */
331   public AllAggregatorServerData getAllAggregatorData() {
332     return allAggregatorData;
333   }
334 
335   /**
336    * Get the reference of the service worker.
337    *
338    * @return CentralizedServiceWorker
339    */
340   public CentralizedServiceWorker<I, V, E> getServiceWorker() {
341     return this.serviceWorker;
342   }
343 
344   /**
345    * Get and clear worker to worker messages for this superstep. Can be
346    * called only once per superstep.
347    *
348    * @return List of messages for this worker
349    */
350   public List<Writable> getAndClearCurrentWorkerToWorkerMessages() {
351     List<Writable> ret = currentWorkerToWorkerMessages;
352     currentWorkerToWorkerMessages = null;
353     return ret;
354   }
355 
356   /**
357    * Add incoming message to this worker for next superstep. Thread-safe.
358    *
359    * @param message Message received
360    */
361   public void addIncomingWorkerToWorkerMessage(Writable message) {
362     incomingWorkerToWorkerMessages.add(message);
363   }
364 
365 
366   /**
367    * Get worker to worker messages received in previous superstep.
368    * @return list of current worker to worker messages.
369    */
370   public List<Writable> getCurrentWorkerToWorkerMessages() {
371     return currentWorkerToWorkerMessages;
372   }
373 
374   /**
375    * Prepare resolving mutation.
376    */
377   public void prepareResolveMutations() {
378     oldPartitionMutations = partitionMutations;
379     partitionMutations = Maps.newConcurrentMap();
380   }
381 
382   /**
383    * Resolve mutations specific for a partition. This method is called once
384    * per partition, before the computation for that partition starts.
385    * @param partition The partition to resolve mutations for
386    */
387   public void resolvePartitionMutation(Partition<I, V, E> partition) {
388     Integer partitionId = partition.getId();
389     VertexResolver<I, V, E> vertexResolver = conf.createVertexResolver();
390     ConcurrentMap<I, VertexMutations<I, V, E>> prevPartitionMutations =
391         oldPartitionMutations.get(partitionId);
392 
393     // Resolve mutations that are explicitly sent for this partition
394     if (prevPartitionMutations != null) {
395       for (Map.Entry<I, VertexMutations<I, V, E>> entry : prevPartitionMutations
396           .entrySet()) {
397         I vertexId = entry.getKey();
398         Vertex<I, V, E> originalVertex = partition.getVertex(vertexId);
399         VertexMutations<I, V, E> vertexMutations = entry.getValue();
400         Vertex<I, V, E> vertex = vertexResolver.resolve(vertexId,
401             originalVertex, vertexMutations,
402             getCurrentMessageStore().hasMessagesForVertex(entry.getKey()));
403 
404         if (LOG.isDebugEnabled()) {
405           LOG.debug("resolvePartitionMutations: Resolved vertex index " +
406               vertexId + " in partition index " + partitionId +
407               " with original vertex " + originalVertex +
408               ", returned vertex " + vertex + " on superstep " +
409               serviceWorker.getSuperstep() + " with mutations " +
410               vertexMutations);
411         }
412 
413         if (vertex != null) {
414           partition.putVertex(vertex);
415         } else if (originalVertex != null) {
416           partition.removeVertex(vertexId);
417           getCurrentMessageStore().clearVertexMessages(vertexId);
418         }
419         context.progress();
420       }
421     }
422 
423     // Keep track of vertices which are not here in the partition, but have
424     // received messages
425     Iterable<I> destinations = getCurrentMessageStore().
426         getPartitionDestinationVertices(partitionId);
427     if (!Iterables.isEmpty(destinations)) {
428       for (I vertexId : destinations) {
429         if (partition.getVertex(vertexId) == null) {
430           Vertex<I, V, E> vertex =
431               vertexResolver.resolve(vertexId, null, null, true);
432 
433           if (LOG.isDebugEnabled()) {
434             LOG.debug("resolvePartitionMutations: A non-existing vertex has " +
435                 "message(s). Added vertex index " + vertexId +
436                 " in partition index " + partitionId +
437                 ", vertex = " + vertex + ", on superstep " +
438                 serviceWorker.getSuperstep());
439           }
440 
441           if (vertex != null) {
442             partition.putVertex(vertex);
443           }
444           context.progress();
445         }
446       }
447     }
448   }
449 
450   /**
451    * In case of async message store we have to wait for all messages
452    * to be processed before going into next superstep.
453    */
454   public void waitForComplete() {
455     if (incomingMessageStore instanceof AsyncMessageStoreWrapper) {
456       ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete();
457     }
458   }
459 }