This project has retired. For details please refer to its
        
        Attic page.
      
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.giraph.comm;
20  
21  import static org.apache.giraph.conf.GiraphConstants.MESSAGE_STORE_FACTORY_CLASS;
22  
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.concurrent.ConcurrentMap;
28  
29  import org.apache.giraph.bsp.CentralizedServiceWorker;
30  import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
31  import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
32  import org.apache.giraph.comm.messages.MessageStore;
33  import org.apache.giraph.comm.messages.MessageStoreFactory;
34  import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
35  import org.apache.giraph.conf.GiraphConstants;
36  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
37  import org.apache.giraph.edge.EdgeStore;
38  import org.apache.giraph.edge.EdgeStoreFactory;
39  import org.apache.giraph.graph.Vertex;
40  import org.apache.giraph.graph.VertexMutations;
41  import org.apache.giraph.graph.VertexResolver;
42  import org.apache.giraph.ooc.OutOfCoreEngine;
43  import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
44  import org.apache.giraph.ooc.data.DiskBackedMessageStore;
45  import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
46  import org.apache.giraph.partition.Partition;
47  import org.apache.giraph.partition.PartitionStore;
48  import org.apache.giraph.partition.SimplePartitionStore;
49  import org.apache.giraph.utils.ReflectionUtils;
50  import org.apache.hadoop.io.Writable;
51  import org.apache.hadoop.io.WritableComparable;
52  import org.apache.hadoop.mapreduce.Mapper;
53  import org.apache.log4j.Logger;
54  
55  import com.google.common.collect.Iterables;
56  import com.google.common.collect.Maps;
57  
58  
59  
60  
61  
62  
63  
64  
65  @SuppressWarnings("rawtypes")
66  public class ServerData<I extends WritableComparable,
67      V extends Writable, E extends Writable> {
68    
69    private static final Logger LOG = Logger.getLogger(ServerData.class);
70    
71    private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
72    
73    private volatile PartitionStore<I, V, E> partitionStore;
74    
75    private final EdgeStore<I, V, E> edgeStore;
76    
77    private final MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
78        messageStoreFactory;
79    
80  
81  
82  
83    private volatile MessageStore<I, Writable> incomingMessageStore;
84    
85  
86  
87  
88    private volatile MessageStore<I, Writable> currentMessageStore;
89    
90  
91  
92  
93  
94  
95    private ConcurrentMap<Integer,
96        ConcurrentMap<I, VertexMutations<I, V, E>>>
97        oldPartitionMutations = Maps.newConcurrentMap();
98    
99  
100 
101 
102 
103 
104   private ConcurrentMap<Integer,
105       ConcurrentMap<I, VertexMutations<I, V, E>>>
106       partitionMutations = Maps.newConcurrentMap();
107   
108 
109 
110   private final OwnerAggregatorServerData ownerAggregatorData;
111   
112 
113 
114   private final AllAggregatorServerData allAggregatorData;
115   
116   private final CentralizedServiceWorker<I, V, E> serviceWorker;
117 
118   
119   private volatile List<Writable> currentWorkerToWorkerMessages =
120       Collections.synchronizedList(new ArrayList<Writable>());
121   
122   private volatile List<Writable> incomingWorkerToWorkerMessages =
123       Collections.synchronizedList(new ArrayList<Writable>());
124 
125   
126   private final Mapper<?, ?, ?, ?>.Context context;
127   
128   private final OutOfCoreEngine oocEngine;
129 
130   
131 
132 
133 
134 
135 
136 
137 
138   public ServerData(
139       CentralizedServiceWorker<I, V, E> service,
140       WorkerServer workerServer,
141       ImmutableClassesGiraphConfiguration<I, V, E> conf,
142       Mapper<?, ?, ?, ?>.Context context) {
143     this.serviceWorker = service;
144     this.conf = conf;
145     this.messageStoreFactory = createMessageStoreFactory();
146     EdgeStoreFactory<I, V, E> edgeStoreFactory = conf.createEdgeStoreFactory();
147     edgeStoreFactory.initialize(service, conf, context);
148     EdgeStore<I, V, E> inMemoryEdgeStore = edgeStoreFactory.newStore();
149     PartitionStore<I, V, E> inMemoryPartitionStore =
150         new SimplePartitionStore<I, V, E>(conf, context);
151     if (GiraphConstants.USE_OUT_OF_CORE_GRAPH.get(conf)) {
152       oocEngine = new OutOfCoreEngine(conf, service, workerServer);
153       partitionStore =
154           new DiskBackedPartitionStore<I, V, E>(inMemoryPartitionStore,
155               conf, context, oocEngine);
156       edgeStore =
157           new DiskBackedEdgeStore<I, V, E>(inMemoryEdgeStore, conf, oocEngine);
158     } else {
159       partitionStore = inMemoryPartitionStore;
160       edgeStore = inMemoryEdgeStore;
161       oocEngine = null;
162     }
163     ownerAggregatorData = new OwnerAggregatorServerData(context);
164     allAggregatorData = new AllAggregatorServerData(context, conf);
165     this.context = context;
166   }
167 
168   
169 
170 
171 
172 
173 
174   private MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
175   createMessageStoreFactory() {
176     Class<? extends MessageStoreFactory> messageStoreFactoryClass =
177         MESSAGE_STORE_FACTORY_CLASS.get(conf);
178 
179     MessageStoreFactory messageStoreFactoryInstance =
180         ReflectionUtils.newInstance(messageStoreFactoryClass);
181     messageStoreFactoryInstance.initialize(serviceWorker, conf);
182 
183     return messageStoreFactoryInstance;
184   }
185 
186   
187 
188 
189 
190 
191   public OutOfCoreEngine getOocEngine() {
192     return oocEngine;
193   }
194 
195   
196 
197 
198 
199 
200   public EdgeStore<I, V, E> getEdgeStore() {
201     return edgeStore;
202   }
203 
204   
205 
206 
207 
208 
209   public PartitionStore<I, V, E> getPartitionStore() {
210     return partitionStore;
211   }
212 
213   
214 
215 
216 
217 
218 
219 
220   public <M extends Writable> MessageStore<I, M> getIncomingMessageStore() {
221     return (MessageStore<I, M>) incomingMessageStore;
222   }
223 
224   
225 
226 
227 
228 
229 
230 
231   public <M extends Writable> MessageStore<I, M> getCurrentMessageStore() {
232     return (MessageStore<I, M>) currentMessageStore;
233   }
234 
235   
236 
237 
238 
239   public void resetMessageStores() {
240     if (currentMessageStore != null) {
241       currentMessageStore.clearAll();
242       currentMessageStore = null;
243     }
244     if (incomingMessageStore != null) {
245       incomingMessageStore.clearAll();
246       incomingMessageStore = null;
247     }
248     prepareSuperstep();
249   }
250 
251   
252   public void prepareSuperstep() {
253     if (currentMessageStore != null) {
254       currentMessageStore.clearAll();
255     }
256 
257     MessageStore<I, Writable> nextCurrentMessageStore;
258     MessageStore<I, Writable> nextIncomingMessageStore;
259     MessageStore<I, Writable> messageStore;
260 
261     
262     
263     
264     if (incomingMessageStore != null) {
265       nextCurrentMessageStore = incomingMessageStore;
266     } else {
267       messageStore = messageStoreFactory.newStore(
268           conf.getIncomingMessageClasses());
269       if (oocEngine == null) {
270         nextCurrentMessageStore = messageStore;
271       } else {
272         nextCurrentMessageStore = new DiskBackedMessageStore<>(
273             conf, oocEngine, messageStore,
274             conf.getIncomingMessageClasses().useMessageCombiner(),
275             serviceWorker.getSuperstep());
276       }
277     }
278 
279     messageStore = messageStoreFactory.newStore(
280         conf.getOutgoingMessageClasses());
281     if (oocEngine == null) {
282       nextIncomingMessageStore = messageStore;
283     } else {
284       nextIncomingMessageStore = new DiskBackedMessageStore<>(
285           conf, oocEngine, messageStore,
286           conf.getOutgoingMessageClasses().useMessageCombiner(),
287           serviceWorker.getSuperstep() + 1);
288     }
289 
290     
291     
292     
293     if (oocEngine != null) {
294       oocEngine.getSuperstepLock().writeLock().lock();
295     }
296     currentMessageStore = nextCurrentMessageStore;
297     incomingMessageStore = nextIncomingMessageStore;
298     if (oocEngine != null) {
299       oocEngine.reset();
300       oocEngine.getSuperstepLock().writeLock().unlock();
301     }
302     currentMessageStore.finalizeStore();
303 
304     currentWorkerToWorkerMessages = incomingWorkerToWorkerMessages;
305     incomingWorkerToWorkerMessages =
306         Collections.synchronizedList(new ArrayList<Writable>());
307   }
308 
309   
310 
311 
312 
313 
314   public ConcurrentMap<Integer, ConcurrentMap<I, VertexMutations<I, V, E>>>
315   getPartitionMutations() {
316     return partitionMutations;
317   }
318 
319   
320 
321 
322 
323 
324   public OwnerAggregatorServerData getOwnerAggregatorData() {
325     return ownerAggregatorData;
326   }
327 
328   
329 
330 
331 
332 
333   public AllAggregatorServerData getAllAggregatorData() {
334     return allAggregatorData;
335   }
336 
337   
338 
339 
340 
341 
342   public CentralizedServiceWorker<I, V, E> getServiceWorker() {
343     return this.serviceWorker;
344   }
345 
346   
347 
348 
349 
350 
351 
352   public List<Writable> getAndClearCurrentWorkerToWorkerMessages() {
353     List<Writable> ret = currentWorkerToWorkerMessages;
354     currentWorkerToWorkerMessages = null;
355     return ret;
356   }
357 
358   
359 
360 
361 
362 
363   public void addIncomingWorkerToWorkerMessage(Writable message) {
364     incomingWorkerToWorkerMessages.add(message);
365   }
366 
367 
368   
369 
370 
371 
372   public List<Writable> getCurrentWorkerToWorkerMessages() {
373     return currentWorkerToWorkerMessages;
374   }
375 
376   
377 
378 
379   public void prepareResolveMutations() {
380     oldPartitionMutations = partitionMutations;
381     partitionMutations = Maps.newConcurrentMap();
382   }
383 
384   
385 
386 
387 
388 
389   public void resolvePartitionMutation(Partition<I, V, E> partition) {
390     Integer partitionId = partition.getId();
391     VertexResolver<I, V, E> vertexResolver = conf.createVertexResolver();
392     ConcurrentMap<I, VertexMutations<I, V, E>> prevPartitionMutations =
393         oldPartitionMutations.get(partitionId);
394 
395     boolean ignoreExistingVertices =
396         conf.getIncomingMessageClasses().ignoreExistingVertices();
397 
398     
399     if (prevPartitionMutations != null) {
400       for (Map.Entry<I, VertexMutations<I, V, E>> entry : prevPartitionMutations
401           .entrySet()) {
402         I vertexId = entry.getKey();
403         Vertex<I, V, E> originalVertex = partition.getVertex(vertexId);
404         VertexMutations<I, V, E> vertexMutations = entry.getValue();
405         Vertex<I, V, E> vertex = vertexResolver.resolve(vertexId,
406             originalVertex, vertexMutations,
407             !ignoreExistingVertices &&
408             getCurrentMessageStore().hasMessagesForVertex(entry.getKey()));
409 
410         if (LOG.isDebugEnabled()) {
411           LOG.debug("resolvePartitionMutations: Resolved vertex index " +
412               vertexId + " in partition index " + partitionId +
413               " with original vertex " + originalVertex +
414               ", returned vertex " + vertex + " on superstep " +
415               serviceWorker.getSuperstep() + " with mutations " +
416               vertexMutations);
417         }
418 
419         if (vertex != null) {
420           partition.putVertex(vertex);
421         } else if (originalVertex != null) {
422           partition.removeVertex(vertexId);
423           if (!ignoreExistingVertices) {
424             getCurrentMessageStore().clearVertexMessages(vertexId);
425           }
426         }
427         context.progress();
428       }
429     }
430 
431     if (!ignoreExistingVertices) {
432       
433       
434       Iterable<I> destinations = getCurrentMessageStore().
435           getPartitionDestinationVertices(partitionId);
436       if (!Iterables.isEmpty(destinations)) {
437         for (I vertexId : destinations) {
438           if (partition.getVertex(vertexId) == null) {
439             Vertex<I, V, E> vertex =
440                 vertexResolver.resolve(vertexId, null, null, true);
441 
442             if (LOG.isDebugEnabled()) {
443               LOG.debug(
444                   "resolvePartitionMutations: A non-existing vertex has " +
445                   "message(s). Added vertex index " + vertexId +
446                   " in partition index " + partitionId +
447                   ", vertex = " + vertex + ", on superstep " +
448                   serviceWorker.getSuperstep());
449             }
450 
451             if (vertex != null) {
452               partition.putVertex(vertex);
453             }
454             context.progress();
455           }
456         }
457       }
458     }
459   }
460 
461   
462 
463 
464 
465   public void waitForComplete() {
466     if (incomingMessageStore instanceof AsyncMessageStoreWrapper) {
467       ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete();
468     }
469   }
470 }