View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc.data;
20  
21  import com.google.common.collect.Maps;
22  import com.google.common.collect.Sets;
23  import com.google.common.util.concurrent.AtomicDouble;
24  import org.apache.giraph.bsp.BspService;
25  import org.apache.giraph.ooc.OutOfCoreEngine;
26  import org.apache.giraph.worker.BspServiceWorker;
27  import org.apache.giraph.worker.WorkerProgress;
28  import org.apache.log4j.Logger;
29  
30  import java.util.ArrayList;
31  import java.util.Iterator;
32  import java.util.List;
33  import java.util.Random;
34  import java.util.Set;
35  import java.util.concurrent.ConcurrentMap;
36  import java.util.concurrent.atomic.AtomicInteger;
37  
38  import static com.google.common.base.Preconditions.checkState;
39  
40  /**
41   * Class to keep meta-information about partition data, edge data, and message
42   * data of each partition on a worker.
43   */
44  public class MetaPartitionManager {
45    /**
46     * Flag representing no partitions is left to process in the current iteration
47     * cycle over all partitions.
48     */
49    public static final int NO_PARTITION_TO_PROCESS = -1;
50  
51    /** Class logger */
52    private static final Logger LOG =
53        Logger.getLogger(MetaPartitionManager.class);
54    /** Different storage states for data */
55    private enum StorageState { IN_MEM, ON_DISK, IN_TRANSIT };
56    /**
57     * Different storage states for a partition as a whole (i.e. the partition
58     * and its current messages)
59     */
60    private enum PartitionStorageState
61      /**
62       * Either both partition and its current messages are in memory, or both
63       * are on disk, or one part is on disk and the other part is in memory.
64       */
65    { FULLY_IN_MEM, PARTIALLY_IN_MEM, FULLY_ON_DISK };
66    /**
67     * Different processing states for partitions. Processing states are reset
68     * at the beginning of each iteration cycle over partitions.
69     */
70    private enum ProcessingState { PROCESSED, UNPROCESSED, IN_PROCESS };
71  
72    /**
73     * Number of partitions in-memory (partition and current messages in memory)
74     */
75    private final AtomicInteger numInMemoryPartitions = new AtomicInteger(0);
76    /**
77     * Number of partitions that are partially in-memory (either partition or its
78     * current messages is in memory and the other part is not)
79     */
80    private final AtomicInteger numPartiallyInMemoryPartitions =
81        new AtomicInteger(0);
82    /** Map (dictionary) of partitions to their meta information */
83    private final ConcurrentMap<Integer, MetaPartition> partitions =
84        Maps.newConcurrentMap();
85    /** Reverse dictionaries of partitions assigned to each IO thread */
86    private final List<MetaPartitionDictionary> perThreadPartitionDictionary;
87    /** For each IO thread, set of partition ids that are on-disk and have
88     * 'large enough' vertex/edge buffers to be offloaded on disk
89     */
90    private final List<Set<Integer>> perThreadVertexEdgeBuffers;
91    /**
92     * For each IO thread, set of partition ids that are on-disk and have
93     * 'large enough' message buffers to be offloaded on disk
94     */
95    private final List<Set<Integer>> perThreadMessageBuffers;
96    /**
97     * Out-of-core engine
98     */
99    private final OutOfCoreEngine oocEngine;
100   /**
101    * Number of processed partitions in the current iteration cycle over all
102    * partitions
103    */
104   private final AtomicInteger numPartitionsProcessed = new AtomicInteger(0);
105   /**
106    * Random number generator to choose a thread to get one of its partition for
107    * processing
108    */
109   private final Random randomGenerator;
110   /**
111    * What is the lowest fraction of partitions in memory, relative to the total
112    * number of available partitions? This is an indirect estimation of the
113    * amount of graph in memory, which can be used to estimate how many more
114    * machines needed to avoid out-of-core execution. At the beginning all the
115    * graph is in memory, so the fraction is 1. This fraction is calculated per
116    * superstep.
117    */
118   private final AtomicDouble lowestGraphFractionInMemory =
119       new AtomicDouble(1);
120   /**
121    * Map of partition ids to their indices. index of a partition is the order
122    * with which the partition has been inserted. Partitions are indexed as 0, 1,
123    * 2, etc. This indexing is later used to find the id of the IO thread who is
124    * responsible for handling a partition. Partitions are assigned to IO threads
125    * in a round-robin fashion based on their indices.
126    */
127   private final ConcurrentMap<Integer, Integer> partitionIndex =
128       Maps.newConcurrentMap();
129   /**
130    * Sequential counter used to assign indices to partitions as they are added
131    */
132   private final AtomicInteger indexCounter = new AtomicInteger(0);
133   /** How many disks (i.e. IO threads) do we have? */
134   private final int numIOThreads;
135 
136   /**
137    * Constructor
138    *
139    * @param numIOThreads number of IO threads
140    * @param oocEngine out-of-core engine
141    */
142   public MetaPartitionManager(int numIOThreads, OutOfCoreEngine oocEngine) {
143     perThreadPartitionDictionary = new ArrayList<>(numIOThreads);
144     perThreadVertexEdgeBuffers = new ArrayList<>(numIOThreads);
145     perThreadMessageBuffers = new ArrayList<>(numIOThreads);
146     for (int i = 0; i < numIOThreads; ++i) {
147       perThreadPartitionDictionary.add(new MetaPartitionDictionary());
148       perThreadMessageBuffers.add(Sets.<Integer>newConcurrentHashSet());
149       perThreadVertexEdgeBuffers.add(Sets.<Integer>newConcurrentHashSet());
150     }
151     this.oocEngine = oocEngine;
152     this.randomGenerator = new Random();
153     this.numIOThreads = numIOThreads;
154   }
155 
156   /**
157    * @return number of partitions in memory
158    */
159   public int getNumInMemoryPartitions() {
160     return numInMemoryPartitions.get();
161   }
162 
163   /**
164    * @return number of partitions that are partially in memory
165    */
166   public int getNumPartiallyInMemoryPartitions() {
167     return numPartiallyInMemoryPartitions.get();
168   }
169 
170   /**
171    * Get total number of partitions
172    *
173    * @return total number of partitions
174    */
175   public int getNumPartitions() {
176     return partitions.size();
177   }
178 
179   /**
180    * Since the statistics are based on estimates, we assume each partial
181    * partition is taking about half of the full partition in terms of memory
182    * footprint.
183    *
184    * @return estimate of fraction of graph in memory
185    */
186   public double getGraphFractionInMemory() {
187     return (getNumInMemoryPartitions() +
188         getNumPartiallyInMemoryPartitions() / 2.0) / getNumPartitions();
189   }
190 
191   /**
192    * Update the lowest fraction of graph in memory so to have a more accurate
193    * information in one of the counters.
194    */
195   private synchronized void updateGraphFractionInMemory() {
196     double graphInMemory = getGraphFractionInMemory();
197     if (graphInMemory < lowestGraphFractionInMemory.get()) {
198       lowestGraphFractionInMemory.set(graphInMemory);
199       WorkerProgress.get().updateLowestGraphPercentageInMemory(
200           (int) (graphInMemory * 100));
201     }
202   }
203 
204   /**
205    * Update the book-keeping about number of in-memory partitions and partially
206    * in-memory partitions with regard to the storage status of the partition and
207    * its current messages before and after an update to its status.
208    *
209    * @param stateBefore the storage state of the partition and its current
210    *                    messages before an update
211    * @param stateAfter the storage state of the partition and its current
212    *                   messages after an update
213    */
214   private void updateCounters(PartitionStorageState stateBefore,
215                               PartitionStorageState stateAfter) {
216     numInMemoryPartitions.getAndAdd(
217         ((stateAfter == PartitionStorageState.FULLY_IN_MEM) ? 1 : 0) -
218             ((stateBefore == PartitionStorageState.FULLY_IN_MEM) ? 1 : 0));
219     numPartiallyInMemoryPartitions.getAndAdd(
220         ((stateAfter == PartitionStorageState.PARTIALLY_IN_MEM) ? 1 : 0) -
221             ((stateBefore == PartitionStorageState.PARTIALLY_IN_MEM) ? 1 : 0));
222   }
223 
224   /**
225    * Whether a given partition is available
226    *
227    * @param partitionId id of the partition to check if this worker owns it
228    * @return true if the worker owns the partition, false otherwise
229    */
230   public boolean hasPartition(Integer partitionId) {
231     return partitions.containsKey(partitionId);
232   }
233 
234   /**
235    * Return the list of all available partitions as an iterable
236    *
237    * @return list of all available partitions
238    */
239   public Iterable<Integer> getPartitionIds() {
240     return partitions.keySet();
241   }
242 
243   /**
244    * Get the thread id that is responsible for a particular partition
245    *
246    * @param partitionId id of the given partition
247    * @return id of the thread responsible for the given partition
248    */
249   public int getOwnerThreadId(int partitionId) {
250     Integer index = partitionIndex.get(partitionId);
251     checkState(index != null);
252     return index % numIOThreads;
253   }
254 
255   /**
256    * Add a partition
257    *
258    * @param partitionId id of a partition to add
259    */
260   public void addPartition(int partitionId) {
261     MetaPartition meta = new MetaPartition(partitionId);
262     MetaPartition temp = partitions.putIfAbsent(partitionId, meta);
263     // Check if the given partition is new
264     if (temp == null) {
265       int index = indexCounter.getAndIncrement();
266       checkState(partitionIndex.putIfAbsent(partitionId, index) == null);
267       int ownerThread = getOwnerThreadId(partitionId);
268       perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
269       numInMemoryPartitions.getAndIncrement();
270     }
271   }
272 
273   /**
274    * Remove a partition. This method assumes that the partition is already
275    * retrieved and is in memory)
276    *
277    * @param partitionId id of a partition to remove
278    */
279   public void removePartition(Integer partitionId) {
280     MetaPartition meta = partitions.remove(partitionId);
281     int ownerThread = getOwnerThreadId(partitionId);
282     perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
283     checkState(!meta.isOnDisk());
284     numInMemoryPartitions.getAndDecrement();
285   }
286 
287   /**
288    * Pops an entry from the specified set.
289    *
290    * @param set set to pop an entry from
291    * @param <T> Type of entries in the set
292    * @return popped entry from the given set
293    */
294   private static <T> T popFromSet(Set<T> set) {
295     if (!set.isEmpty()) {
296       Iterator<T> it = set.iterator();
297       T entry = it.next();
298       it.remove();
299       return entry;
300     }
301     return null;
302   }
303 
304   /**
305    * Peeks an entry from the specified set.
306    *
307    * @param set set to peek an entry from
308    * @param <T> Type of entries in the set
309    * @return peeked entry from the given set
310    */
311   private static <T> T peekFromSet(Set<T> set) {
312     if (!set.isEmpty()) {
313       return set.iterator().next();
314     }
315     return null;
316   }
317 
318   /**
319    * Get id of a partition to offload to disk. Prioritize offloading processed
320    * partitions over unprocessed partition. Also, prioritize offloading
321    * partitions partially in memory over partitions fully in memory.
322    *
323    * @param threadId id of the thread who is going to store the partition on
324    *                 disk
325    * @return id of the partition to offload on disk
326    */
327   public Integer getOffloadPartitionId(int threadId) {
328     // First, look for a processed partition partially on disk
329     MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup(
330         ProcessingState.PROCESSED,
331         StorageState.IN_MEM,
332         StorageState.ON_DISK,
333         null);
334     if (meta != null) {
335       return meta.getPartitionId();
336     }
337     meta = perThreadPartitionDictionary.get(threadId).lookup(
338         ProcessingState.PROCESSED,
339         StorageState.ON_DISK,
340         StorageState.IN_MEM,
341         null);
342     if (meta != null) {
343       return meta.getPartitionId();
344     }
345     // Second, look for a processed partition entirely in memory
346     meta = perThreadPartitionDictionary.get(threadId).lookup(
347         ProcessingState.PROCESSED,
348         StorageState.IN_MEM,
349         StorageState.IN_MEM,
350         null);
351     if (meta != null) {
352       return meta.getPartitionId();
353     }
354 
355     // Third, look for an unprocessed partition partially on disk
356     meta = perThreadPartitionDictionary.get(threadId).lookup(
357         ProcessingState.UNPROCESSED,
358         StorageState.IN_MEM,
359         StorageState.ON_DISK,
360         null);
361     if (meta != null) {
362       return meta.getPartitionId();
363     }
364     meta = perThreadPartitionDictionary.get(threadId).lookup(
365         ProcessingState.UNPROCESSED,
366         StorageState.ON_DISK,
367         StorageState.IN_MEM,
368         null);
369     if (meta != null) {
370       return meta.getPartitionId();
371     }
372     // Forth, look for an unprocessed partition entirely in memory
373     meta = perThreadPartitionDictionary.get(threadId).lookup(
374         ProcessingState.UNPROCESSED,
375         StorageState.IN_MEM,
376         StorageState.IN_MEM,
377         null);
378     if (meta != null) {
379       return meta.getPartitionId();
380     }
381     return null;
382   }
383 
384   /**
385    * Get id of a partition to offload its vertex/edge buffers on disk
386    *
387    * @param threadId id of the thread who is going to store the buffers on disk
388    * @return id of the partition to offload its vertex/edge buffers on disk
389    */
390   public Integer getOffloadPartitionBufferId(int threadId) {
391     if (oocEngine.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP) {
392       Integer partitionId =
393           popFromSet(perThreadVertexEdgeBuffers.get(threadId));
394       if (partitionId == null) {
395         DiskBackedPartitionStore<?, ?, ?> partitionStore =
396             (DiskBackedPartitionStore<?, ?, ?>) (oocEngine.getServerData()
397                 .getPartitionStore());
398         perThreadVertexEdgeBuffers.get(threadId)
399             .addAll(partitionStore.getCandidateBuffersToOffload(threadId));
400         DiskBackedEdgeStore<?, ?, ?> edgeStore =
401             (DiskBackedEdgeStore<?, ?, ?>) (oocEngine.getServerData())
402                 .getEdgeStore();
403         perThreadVertexEdgeBuffers.get(threadId)
404             .addAll(edgeStore.getCandidateBuffersToOffload(threadId));
405         partitionId = popFromSet(perThreadVertexEdgeBuffers.get(threadId));
406       }
407       return partitionId;
408     }
409     return null;
410   }
411 
412   /**
413    * Get id of a partition to offload its incoming message buffers on disk
414    *
415    * @param threadId id of the thread who is going to store the buffers on disk
416    * @return id of the partition to offload its message buffer on disk
417    */
418   public Integer getOffloadMessageBufferId(int threadId) {
419     if (oocEngine.getSuperstep() != BspServiceWorker.INPUT_SUPERSTEP) {
420       Integer partitionId =
421           popFromSet(perThreadMessageBuffers.get(threadId));
422       if (partitionId == null) {
423         DiskBackedMessageStore<?, ?> messageStore =
424             (DiskBackedMessageStore<?, ?>) (oocEngine.getServerData()
425                 .getIncomingMessageStore());
426         if (messageStore != null) {
427           perThreadMessageBuffers.get(threadId)
428               .addAll(messageStore.getCandidateBuffersToOffload(threadId));
429           partitionId = popFromSet(perThreadMessageBuffers.get(threadId));
430         }
431       }
432       return partitionId;
433     }
434     return null;
435   }
436 
437   /**
438    * Get id of a partition to offload its incoming message on disk. Prioritize
439    * offloading messages of partitions already on disk, and then partitions
440    * in-transit, over partitions in-memory. Also, prioritize processed
441    * partitions over unprocessed (processed partitions would go on disk with
442    * more chances that unprocessed partitions)
443    *
444    * @param threadId id of the thread who is going to store the incoming
445    *                 messages on disk
446    * @return id of the partition to offload its message on disk
447    */
448   public Integer getOffloadMessageId(int threadId) {
449     if (oocEngine.getSuperstep() == BspService.INPUT_SUPERSTEP) {
450       return null;
451     }
452     MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup(
453         ProcessingState.PROCESSED,
454         StorageState.ON_DISK,
455         null,
456         StorageState.IN_MEM);
457     if (meta != null) {
458       return meta.getPartitionId();
459     }
460     meta = perThreadPartitionDictionary.get(threadId).lookup(
461         ProcessingState.PROCESSED,
462         StorageState.IN_TRANSIT,
463         null,
464         StorageState.IN_MEM);
465     if (meta != null) {
466       return meta.getPartitionId();
467     }
468     meta = perThreadPartitionDictionary.get(threadId).lookup(
469         ProcessingState.UNPROCESSED,
470         StorageState.ON_DISK,
471         null,
472         StorageState.IN_MEM);
473     if (meta != null) {
474       return meta.getPartitionId();
475     }
476     meta = perThreadPartitionDictionary.get(threadId).lookup(
477         ProcessingState.UNPROCESSED,
478         StorageState.IN_TRANSIT,
479         null,
480         StorageState.IN_MEM);
481     if (meta != null) {
482       return meta.getPartitionId();
483     }
484     return null;
485   }
486 
487   /**
488    * Get id of a partition to load its data to memory. Prioritize loading an
489    * unprocessed partition over loading processed partition. Also, prioritize
490    * loading a partition partially in memory over partitions entirely on disk.
491    *
492    * @param threadId id of the thread who is going to load the partition data
493    * @return id of the partition to load its data to memory
494    */
495   public Integer getLoadPartitionId(int threadId) {
496     // First, look for an unprocessed partition partially in memory
497     MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup(
498         ProcessingState.UNPROCESSED,
499         StorageState.IN_MEM,
500         StorageState.ON_DISK,
501         null);
502     if (meta != null) {
503       return meta.getPartitionId();
504     }
505 
506     meta = perThreadPartitionDictionary.get(threadId).lookup(
507         ProcessingState.UNPROCESSED,
508         StorageState.ON_DISK,
509         StorageState.IN_MEM,
510         null);
511     if (meta != null) {
512       return meta.getPartitionId();
513     }
514 
515     // Second, look for an unprocessed partition entirely on disk
516     meta = perThreadPartitionDictionary.get(threadId).lookup(
517         ProcessingState.UNPROCESSED,
518         StorageState.ON_DISK,
519         StorageState.ON_DISK,
520         null);
521     if (meta != null) {
522       return meta.getPartitionId();
523     }
524 
525     // Third, look for a processed partition partially in memory
526     meta = perThreadPartitionDictionary.get(threadId).lookup(
527         ProcessingState.PROCESSED,
528         StorageState.IN_MEM,
529         StorageState.ON_DISK,
530         null);
531     if (meta != null) {
532       return meta.getPartitionId();
533     }
534 
535     meta = perThreadPartitionDictionary.get(threadId).lookup(
536         ProcessingState.PROCESSED,
537         StorageState.ON_DISK,
538         StorageState.IN_MEM,
539         null);
540     if (meta != null) {
541       return meta.getPartitionId();
542     }
543 
544     // Forth, look for a processed partition entirely on disk
545     meta = perThreadPartitionDictionary.get(threadId).lookup(
546         ProcessingState.PROCESSED,
547         StorageState.ON_DISK,
548         StorageState.ON_DISK,
549         null);
550     if (meta != null) {
551       return meta.getPartitionId();
552     }
553 
554     return null;
555   }
556 
557   /**
558    * Mark a partition as being 'IN_PROCESS'
559    *
560    * @param partitionId id of the partition to mark
561    */
562   public void markPartitionAsInProcess(int partitionId) {
563     MetaPartition meta = partitions.get(partitionId);
564     int ownerThread = getOwnerThreadId(partitionId);
565     synchronized (meta) {
566       perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
567       meta.setProcessingState(ProcessingState.IN_PROCESS);
568       perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
569     }
570   }
571 
572   /**
573    * Whether there is any processed partition stored in memory (excluding those
574    * that are prefetched to execute in the next superstep).
575    *
576    * @return true iff there is any processed partition in memory
577    */
578   public boolean hasProcessedOnMemory() {
579     for (MetaPartitionDictionary dictionary : perThreadPartitionDictionary) {
580       if (dictionary.hasProcessedOnMemory()) {
581         return true;
582       }
583     }
584     return false;
585   }
586 
587   /**
588    * Whether a partition is *processed* in the current iteration cycle over
589    * partitions.
590    *
591    * @param partitionId id of the partition to check
592    * @return true iff processing the given partition is done
593    */
594   public boolean isPartitionProcessed(Integer partitionId) {
595     MetaPartition meta = partitions.get(partitionId);
596     synchronized (meta) {
597       return meta.getProcessingState() == ProcessingState.PROCESSED;
598     }
599   }
600 
601   /**
602    * Mark a partition as 'PROCESSED'
603    *
604    * @param partitionId id of the partition to mark
605    */
606   public void setPartitionIsProcessed(int partitionId) {
607     MetaPartition meta = partitions.get(partitionId);
608     int ownerThread = getOwnerThreadId(partitionId);
609     synchronized (meta) {
610       perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
611       meta.setProcessingState(ProcessingState.PROCESSED);
612       perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
613     }
614     numPartitionsProcessed.getAndIncrement();
615   }
616 
617   /**
618    * Notify this meta store that load of a partition for a specific superstep
619    * is about to start.
620    *
621    * @param partitionId id of the partition to load to memory
622    * @param superstep superstep in which the partition is needed for
623    * @return true iff load of the given partition is viable
624    */
625   public boolean startLoadingPartition(int partitionId, long superstep) {
626     MetaPartition meta = partitions.get(partitionId);
627     synchronized (meta) {
628       boolean shouldLoad = meta.getPartitionState() == StorageState.ON_DISK;
629       if (superstep == oocEngine.getSuperstep()) {
630         shouldLoad |= meta.getCurrentMessagesState() == StorageState.ON_DISK;
631       } else {
632         shouldLoad |= meta.getIncomingMessagesState() == StorageState.ON_DISK;
633       }
634       return shouldLoad;
635     }
636   }
637 
638   /**
639    * Notify this meta store that load of a partition for a specific superstep
640    * is completed
641    *
642    * @param partitionId id of the partition for which the load is completed
643    * @param superstep superstep in which the partition is loaded for
644    */
645   public void doneLoadingPartition(int partitionId, long superstep) {
646     MetaPartition meta = partitions.get(partitionId);
647     int owner = getOwnerThreadId(partitionId);
648     synchronized (meta) {
649       PartitionStorageState stateBefore = meta.getPartitionStorageState();
650       perThreadPartitionDictionary.get(owner).removePartition(meta);
651       meta.setPartitionState(StorageState.IN_MEM);
652       if (superstep == oocEngine.getSuperstep()) {
653         meta.setCurrentMessagesState(StorageState.IN_MEM);
654       } else {
655         meta.setIncomingMessagesState(StorageState.IN_MEM);
656       }
657       PartitionStorageState stateAfter = meta.getPartitionStorageState();
658       updateCounters(stateBefore, stateAfter);
659       // Check whether load was to prefetch a partition from disk to memory for
660       // the next superstep
661       if (meta.getProcessingState() == ProcessingState.PROCESSED) {
662         perThreadPartitionDictionary.get(owner).increaseNumPrefetch();
663       }
664       perThreadPartitionDictionary.get(owner).addPartition(meta);
665     }
666     updateGraphFractionInMemory();
667   }
668 
669   /**
670    * Notify this meta store that offload of messages for a particular partition
671    * is about to start.
672    *
673    * @param partitionId id of the partition that its messages is being offloaded
674    * @return true iff offload of messages of the given partition is viable
675    */
676   public boolean startOffloadingMessages(int partitionId) {
677     MetaPartition meta = partitions.get(partitionId);
678     int ownerThread = getOwnerThreadId(partitionId);
679     synchronized (meta) {
680       if (meta.getIncomingMessagesState() == StorageState.IN_MEM) {
681         perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
682         meta.setIncomingMessagesState(StorageState.IN_TRANSIT);
683         perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
684         return true;
685       } else {
686         return false;
687       }
688     }
689   }
690 
691   /**
692    * Notify this meta store that offload of messages for a particular partition
693    * is complete.
694    *
695    * @param partitionId id of the partition that its messages is offloaded to
696    *                    disk
697    */
698   public void doneOffloadingMessages(int partitionId) {
699     MetaPartition meta = partitions.get(partitionId);
700     int ownerThread = getOwnerThreadId(partitionId);
701     synchronized (meta) {
702       perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
703       meta.setIncomingMessagesState(StorageState.ON_DISK);
704       perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
705     }
706   }
707 
708   /**
709    * Notify this meta store that offload of raw data buffers (vertex/edges/
710    * messages) of a particular partition is about to start.
711    *
712    * @param partitionId id of the partition that its buffer is being offloaded
713    * @return true iff offload of buffers of the given partition is viable
714    */
715   public boolean startOffloadingBuffer(int partitionId) {
716     // Do nothing
717     return true;
718   }
719 
720   /**
721    * Notify this meta store that offload of raw data buffers (vertex/edges/
722    * messages) of a particular partition is completed.
723    *
724    * @param partitionId id of the partition that its buffer is offloaded
725    */
726   public void doneOffloadingBuffer(int partitionId) {
727     // Do nothing
728   }
729 
730   /**
731    * Notify this meta store that offload of a partition (partition data and its
732    * current messages) is about to start.
733    *
734    * @param partitionId id of the partition that its data is being offloaded
735    * @return true iff offload of the given partition is viable
736    */
737   public boolean startOffloadingPartition(int partitionId) {
738     MetaPartition meta = partitions.get(partitionId);
739     int owner = getOwnerThreadId(partitionId);
740     synchronized (meta) {
741       if (meta.getProcessingState() != ProcessingState.IN_PROCESS &&
742           (meta.getPartitionState() == StorageState.IN_MEM ||
743           meta.getCurrentMessagesState() == StorageState.IN_MEM)) {
744         perThreadPartitionDictionary.get(owner).removePartition(meta);
745         // We may only need to offload either partition or current messages of
746         // that partition to disk. So, if either of the components (partition
747         // or its current messages) is already on disk, we should not update its
748         // metadata.
749         if (meta.getPartitionState() != StorageState.ON_DISK) {
750           meta.setPartitionState(StorageState.IN_TRANSIT);
751         }
752         if (meta.getCurrentMessagesState() != StorageState.ON_DISK) {
753           meta.setCurrentMessagesState(StorageState.IN_TRANSIT);
754         }
755         perThreadPartitionDictionary.get(owner).addPartition(meta);
756         return true;
757       } else {
758         return false;
759       }
760     }
761   }
762 
763   /**
764    * Notify this meta store that offload of a partition (partition data and its
765    * current messages) is completed.
766    *
767    * @param partitionId id of the partition that its data is offloaded
768    */
769   public void doneOffloadingPartition(int partitionId) {
770     MetaPartition meta = partitions.get(partitionId);
771     int owner = getOwnerThreadId(partitionId);
772     synchronized (meta) {
773       // We either offload both partition and its messages to disk, or we only
774       // offload one of the components.
775       if (meta.getCurrentMessagesState() == StorageState.IN_TRANSIT &&
776           meta.getPartitionState() == StorageState.IN_TRANSIT) {
777         numInMemoryPartitions.getAndDecrement();
778       } else {
779         numPartiallyInMemoryPartitions.getAndDecrement();
780       }
781       perThreadPartitionDictionary.get(owner).removePartition(meta);
782       meta.setPartitionState(StorageState.ON_DISK);
783       meta.setCurrentMessagesState(StorageState.ON_DISK);
784       perThreadPartitionDictionary.get(owner).addPartition(meta);
785     }
786     updateGraphFractionInMemory();
787   }
788 
789   /**
790    * Reset the meta store for a new iteration cycle over all partitions.
791    * Note: this is not thread-safe and should be called from a single thread.
792    */
793   public void resetPartitions() {
794     for (MetaPartition meta : partitions.values()) {
795       int owner = getOwnerThreadId(meta.getPartitionId());
796       perThreadPartitionDictionary.get(owner).removePartition(meta);
797       meta.resetPartition();
798       perThreadPartitionDictionary.get(owner).addPartition(meta);
799     }
800     for (MetaPartitionDictionary dictionary : perThreadPartitionDictionary) {
801       dictionary.reset();
802     }
803     numPartitionsProcessed.set(0);
804   }
805 
806   /**
807    * Reset messages in the meta store.
808    * Note: this is not thread-safe and should be called from a single thread.
809    */
810   public void resetMessages() {
811     for (MetaPartition meta : partitions.values()) {
812       int owner = getOwnerThreadId(meta.getPartitionId());
813       perThreadPartitionDictionary.get(owner).removePartition(meta);
814       PartitionStorageState stateBefore = meta.getPartitionStorageState();
815       meta.resetMessages();
816       PartitionStorageState stateAfter = meta.getPartitionStorageState();
817       updateCounters(stateBefore, stateAfter);
818       perThreadPartitionDictionary.get(owner).addPartition(meta);
819     }
820   }
821 
822   /**
823    * Return the id of an unprocessed partition in memory. If all partitions are
824    * processed, return an appropriate 'finisher signal'. If there are
825    * unprocessed partitions, but none are in memory, return null.
826    *
827    * @return id of the partition to be processed next.
828    */
829   public Integer getNextPartition() {
830     if (numPartitionsProcessed.get() >= partitions.size()) {
831       return NO_PARTITION_TO_PROCESS;
832     }
833     int numThreads = perThreadPartitionDictionary.size();
834     int index = randomGenerator.nextInt(numThreads);
835     int startIndex = index;
836     MetaPartition meta;
837     do {
838       // We first look up a partition in the reverse dictionary. If there is a
839       // partition with the given properties, we then check whether we can
840       // return it as the next partition to process. If we cannot, there may
841       // still be other partitions in the dictionary, so we will continue
842       // looping through all of them. If all the partitions with our desired
843       // properties has been examined, we will break the loop.
844       while (true) {
845         meta = perThreadPartitionDictionary.get(index).lookup(
846             ProcessingState.UNPROCESSED,
847             StorageState.IN_MEM,
848             StorageState.IN_MEM,
849             null);
850         if (meta != null) {
851           // Here we should check if the 'meta' still has the same property as
852           // when it was looked up in the dictionary. There may be a case where
853           // meta changes from the time it is looked up until the moment the
854           // synchronize block is granted to progress.
855           synchronized (meta) {
856             if (meta.getProcessingState() == ProcessingState.UNPROCESSED &&
857                 meta.getPartitionState() == StorageState.IN_MEM &&
858                 meta.getCurrentMessagesState() == StorageState.IN_MEM) {
859               perThreadPartitionDictionary.get(index).removePartition(meta);
860               meta.setProcessingState(ProcessingState.IN_PROCESS);
861               perThreadPartitionDictionary.get(index).addPartition(meta);
862               return meta.getPartitionId();
863             }
864           }
865         } else {
866           break;
867         }
868       }
869       index = (index + 1) % numThreads;
870     } while (index != startIndex);
871     return null;
872   }
873 
874   /**
875    * Whether a partition is on disk (both its data and its current messages)
876    *
877    * @param partitionId id of the partition to check if it is on disk
878    * @return true if partition data or its current messages are on disk, false
879    *         otherwise
880    */
881   public boolean isPartitionOnDisk(int partitionId) {
882     MetaPartition meta = partitions.get(partitionId);
883     synchronized (meta) {
884       return meta.isOnDisk();
885     }
886   }
887 
888   /**
889    * Representation of meta information of a partition
890    */
891   private static class MetaPartition {
892     /** Id of the partition */
893     private int partitionId;
894     /** Storage state of incoming messages */
895     private StorageState incomingMessagesState;
896     /** Storage state of current messages */
897     private StorageState currentMessagesState;
898     /** Storage state of partition data */
899     private StorageState partitionState;
900     /** Processing state of a partition */
901     private ProcessingState processingState;
902 
903     /**
904      * Constructor
905      *
906      * @param partitionId id of the partition
907      */
908     public MetaPartition(int partitionId) {
909       this.partitionId = partitionId;
910       this.processingState = ProcessingState.UNPROCESSED;
911       this.partitionState = StorageState.IN_MEM;
912       this.currentMessagesState = StorageState.IN_MEM;
913       this.incomingMessagesState = StorageState.IN_MEM;
914     }
915 
916     @Override
917     public String toString() {
918       StringBuffer sb = new StringBuffer();
919       sb.append("\nMetaData: {");
920       sb.append("ID: " + partitionId + "; ");
921       sb.append("Partition: " + partitionState + "; ");
922       sb.append("Current Messages: " + currentMessagesState + "; ");
923       sb.append("Incoming Messages: " + incomingMessagesState + "; ");
924       sb.append("Processed? : " + processingState + "}");
925       return sb.toString();
926     }
927 
928     public int getPartitionId() {
929       return partitionId;
930     }
931 
932     public StorageState getIncomingMessagesState() {
933       return incomingMessagesState;
934     }
935 
936     public void setIncomingMessagesState(StorageState incomingMessagesState) {
937       this.incomingMessagesState = incomingMessagesState;
938     }
939 
940     public StorageState getCurrentMessagesState() {
941       return currentMessagesState;
942     }
943 
944     public void setCurrentMessagesState(StorageState currentMessagesState) {
945       this.currentMessagesState = currentMessagesState;
946     }
947 
948     public StorageState getPartitionState() {
949       return partitionState;
950     }
951 
952     public void setPartitionState(StorageState state) {
953       this.partitionState = state;
954     }
955 
956     public ProcessingState getProcessingState() {
957       return processingState;
958     }
959 
960     public void setProcessingState(ProcessingState processingState) {
961       this.processingState = processingState;
962     }
963 
964     /**
965      * Whether the partition is on disk (either its data or its current
966      * messages)
967      *
968      * @return true if the partition is on disk, false otherwise
969      */
970     public boolean isOnDisk() {
971       return partitionState == StorageState.ON_DISK ||
972           currentMessagesState == StorageState.ON_DISK;
973     }
974 
975     /**
976      * Reset the partition meta information for the next iteration cycle
977      */
978     public void resetPartition() {
979       processingState = ProcessingState.UNPROCESSED;
980     }
981 
982     /**
983      * Reset messages meta information for the next iteration cycle
984      */
985     public void resetMessages() {
986       currentMessagesState = incomingMessagesState;
987       incomingMessagesState = StorageState.IN_MEM;
988     }
989 
990     /**
991      * @return the state of the partition and its current messages as a whole
992      */
993     public PartitionStorageState getPartitionStorageState() {
994       if (partitionState == StorageState.ON_DISK &&
995           currentMessagesState == StorageState.ON_DISK) {
996         return PartitionStorageState.FULLY_ON_DISK;
997       } else if (partitionState == StorageState.IN_MEM &&
998           currentMessagesState == StorageState.IN_MEM) {
999         return PartitionStorageState.FULLY_IN_MEM;
1000       } else {
1001         return PartitionStorageState.PARTIALLY_IN_MEM;
1002       }
1003     }
1004   }
1005 
1006   /**
1007    * Class representing reverse dictionary for partitions. The main operation
1008    * of the reverse dictionary is to lookup for a partition with certain
1009    * properties. The responsibility of keeping the dictionary consistent
1010    * when partition property changes in on the code that changes the property.
1011    * One can simply remove a partition from the dictionary, change the property
1012    * (or properties), and then add the partition to the dictionary.
1013    */
1014   private static class MetaPartitionDictionary {
1015     /**
1016      * Sets of partitions for each possible combination of properties. Each
1017      * partition can have 4 properties, and each property can have any of 3
1018      * different values. The properties are as follows (in the order in which
1019      * it is used as the dimensions of the following 4-D array):
1020      *  - processing status (PROCESSED, UN_PROCESSED, or IN_PROCESS)
1021      *  - partition storage status (IN_MEM, IN_TRANSIT, ON_DISK)
1022      *  - current messages storage status (IN_MEM, IN_TRANSIT, ON_DISK)
1023      *  - incoming messages storage status (IN_MEM, IN_TRANSIT, ON_DISK)
1024      */
1025     private final Set<MetaPartition>[][][][] partitions =
1026         (Set<MetaPartition>[][][][]) new Set<?>[3][3][3][3];
1027     /**
1028      * Number of partitions that has been prefetched to be computed in the
1029      * next superstep
1030      */
1031     private final AtomicInteger numPrefetch = new AtomicInteger(0);
1032 
1033     /**
1034      * Constructor
1035      */
1036     public MetaPartitionDictionary() {
1037       for (int i = 0; i < 3; ++i) {
1038         for (int j = 0; j < 3; ++j) {
1039           for (int k = 0; k < 3; ++k) {
1040             for (int t = 0; t < 3; ++t) {
1041               partitions[i][j][k][t] = Sets.newLinkedHashSet();
1042             }
1043           }
1044         }
1045       }
1046     }
1047 
1048     /**
1049      * Get a partition set associated with property combination that a given
1050      * partition has
1051      *
1052      * @param meta meta partition containing properties of a partition
1053      * @return partition set with the same property combination as the given
1054      *         meta partition
1055      */
1056     private Set<MetaPartition> getSet(MetaPartition meta) {
1057       return partitions[meta.getProcessingState().ordinal()]
1058           [meta.getPartitionState().ordinal()]
1059           [meta.getCurrentMessagesState().ordinal()]
1060           [meta.getIncomingMessagesState().ordinal()];
1061     }
1062 
1063     /**
1064      * Add a partition to the dictionary
1065      *
1066      * @param meta meta information of the partition to add
1067      */
1068     public void addPartition(MetaPartition meta) {
1069       Set<MetaPartition> partitionSet = getSet(meta);
1070       synchronized (partitionSet) {
1071         partitionSet.add(meta);
1072       }
1073     }
1074 
1075     /**
1076      * Remove a partition to the dictionary
1077      *
1078      * @param meta meta infomation of the partition to remove
1079      */
1080     public void removePartition(MetaPartition meta) {
1081       Set<MetaPartition> partitionSet = getSet(meta);
1082       synchronized (partitionSet) {
1083         partitionSet.remove(meta);
1084       }
1085     }
1086 
1087     /**
1088      * Lookup for a partition with given properties. One can use wildcard as
1089      * a property in lookup operation (by passing null as the property).
1090      *
1091      * @param processingState processing state property
1092      * @param partitionStorageState partition storage property
1093      * @param currentMessagesState current messages storage property
1094      * @param incomingMessagesState incoming messages storage property
1095      * @return a meta partition in the dictionary with the given combination of
1096      *         properties. If there is no such partition, return null
1097      */
1098     public MetaPartition lookup(ProcessingState processingState,
1099                                 StorageState partitionStorageState,
1100                                 StorageState currentMessagesState,
1101                                 StorageState incomingMessagesState) {
1102       int iStart =
1103           (processingState == null) ? 0 : processingState.ordinal();
1104       int iEnd =
1105           (processingState == null) ? 3 : (processingState.ordinal() + 1);
1106       int jStart =
1107           (partitionStorageState == null) ? 0 : partitionStorageState.ordinal();
1108       int jEnd = (partitionStorageState == null) ? 3 :
1109               (partitionStorageState.ordinal() + 1);
1110       int kStart =
1111           (currentMessagesState == null) ? 0 : currentMessagesState.ordinal();
1112       int kEnd = (currentMessagesState == null) ? 3 :
1113               (currentMessagesState.ordinal() + 1);
1114       int tStart =
1115           (incomingMessagesState == null) ? 0 : incomingMessagesState.ordinal();
1116       int tEnd = (incomingMessagesState == null) ? 3 :
1117           (incomingMessagesState.ordinal() + 1);
1118       for (int i = iStart; i < iEnd; ++i) {
1119         for (int j = jStart; j < jEnd; ++j) {
1120           for (int k = kStart; k < kEnd; ++k) {
1121             for (int t = tStart; t < tEnd; ++t) {
1122               Set<MetaPartition> partitionSet = partitions[i][j][k][t];
1123               synchronized (partitionSet) {
1124                 MetaPartition meta = peekFromSet(partitionSet);
1125                 if (meta != null) {
1126                   return meta;
1127                 }
1128               }
1129             }
1130           }
1131         }
1132       }
1133       return null;
1134     }
1135 
1136     /**
1137      * Whether there is an in-memory partition that is processed already,
1138      * excluding those partitions that are prefetched
1139      *
1140      * @return true if there is a processed in-memory partition
1141      */
1142     public boolean hasProcessedOnMemory() {
1143       int count = 0;
1144       for (int i = 0; i < 3; ++i) {
1145         for (int j = 0; j < 3; ++j) {
1146           Set<MetaPartition> partitionSet =
1147               partitions[ProcessingState.PROCESSED.ordinal()]
1148                   [StorageState.IN_MEM.ordinal()][i][j];
1149           synchronized (partitionSet) {
1150             count += partitionSet.size();
1151           }
1152         }
1153       }
1154       return count - numPrefetch.get() != 0;
1155     }
1156 
1157     /** Increase number of prefetch-ed partition by 1 */
1158     public void increaseNumPrefetch() {
1159       numPrefetch.getAndIncrement();
1160     }
1161 
1162     /**
1163      * Reset the dictionary preparing it for the next iteration cycle over
1164      * partitions
1165      */
1166     public void reset() {
1167       numPrefetch.set(0);
1168     }
1169   }
1170 }