This project has retired. For details please refer to its Attic page.
MetaPartitionManager xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc.data;
20  
21  import com.google.common.collect.Maps;
22  import com.google.common.collect.Sets;
23  import com.google.common.util.concurrent.AtomicDouble;
24  import org.apache.giraph.bsp.BspService;
25  import org.apache.giraph.ooc.OutOfCoreEngine;
26  import org.apache.giraph.worker.BspServiceWorker;
27  import org.apache.giraph.worker.WorkerProgress;
28  import org.apache.log4j.Logger;
29  
30  import java.util.ArrayList;
31  import java.util.Iterator;
32  import java.util.List;
33  import java.util.Random;
34  import java.util.Set;
35  import java.util.concurrent.ConcurrentMap;
36  import java.util.concurrent.atomic.AtomicInteger;
37  
38  import static com.google.common.base.Preconditions.checkState;
39  
40  /**
41   * Class to keep meta-information about partition data, edge data, and message
42   * data of each partition on a worker.
43   */
44  public class MetaPartitionManager {
45    /**
46     * Flag representing no partitions is left to process in the current iteration
47     * cycle over all partitions.
48     */
49    public static final int NO_PARTITION_TO_PROCESS = -1;
50  
51    /** Class logger */
52    private static final Logger LOG =
53        Logger.getLogger(MetaPartitionManager.class);
54    /** Different storage states for data */
55    private enum StorageState { IN_MEM, ON_DISK, IN_TRANSIT };
56    /**
57     * Different storage states for a partition as a whole (i.e. the partition
58     * and its current messages)
59     */
60    private enum PartitionStorageState
61      /**
62       * Either both partition and its current messages are in memory, or both
63       * are on disk, or one part is on disk and the other part is in memory.
64       */
65    { FULLY_IN_MEM, PARTIALLY_IN_MEM, FULLY_ON_DISK };
66    /**
67     * Different processing states for partitions. Processing states are reset
68     * at the beginning of each iteration cycle over partitions.
69     */
70    private enum ProcessingState { PROCESSED, UNPROCESSED, IN_PROCESS };
71  
72    /**
73     * Number of partitions in-memory (partition and current messages in memory)
74     */
75    private final AtomicInteger numInMemoryPartitions = new AtomicInteger(0);
76    /**
77     * Number of partitions that are partially in-memory (either partition or its
78     * current messages is in memory and the other part is not)
79     */
80    private final AtomicInteger numPartiallyInMemoryPartitions =
81        new AtomicInteger(0);
82    /** Map (dictionary) of partitions to their meta information */
83    private final ConcurrentMap<Integer, MetaPartition> partitions =
84        Maps.newConcurrentMap();
85    /** Reverse dictionaries of partitions assigned to each IO thread */
86    private final List<MetaPartitionDictionary> perThreadPartitionDictionary;
87    /** For each IO thread, set of partition ids that are on-disk and have
88     * 'large enough' vertex/edge buffers to be offloaded on disk
89     */
90    private final List<Set<Integer>> perThreadVertexEdgeBuffers;
91    /**
92     * For each IO thread, set of partition ids that are on-disk and have
93     * 'large enough' message buffers to be offloaded on disk
94     */
95    private final List<Set<Integer>> perThreadMessageBuffers;
96    /**
97     * Out-of-core engine
98     */
99    private final OutOfCoreEngine oocEngine;
100   /**
101    * Number of processed partitions in the current iteration cycle over all
102    * partitions
103    */
104   private final AtomicInteger numPartitionsProcessed = new AtomicInteger(0);
105   /**
106    * Random number generator to choose a thread to get one of its partition for
107    * processing
108    */
109   private final Random randomGenerator;
110   /**
111    * What is the lowest fraction of partitions in memory, relative to the total
112    * number of available partitions? This is an indirect estimation of the
113    * amount of graph in memory, which can be used to estimate how many more
114    * machines needed to avoid out-of-core execution. At the beginning all the
115    * graph is in memory, so the fraction is 1. This fraction is calculated per
116    * superstep.
117    */
118   private final AtomicDouble lowestGraphFractionInMemory =
119       new AtomicDouble(1);
120   /**
121    * Map of partition ids to their indices. index of a partition is the order
122    * with which the partition has been inserted. Partitions are indexed as 0, 1,
123    * 2, etc. This indexing is later used to find the id of the IO thread who is
124    * responsible for handling a partition. Partitions are assigned to IO threads
125    * in a round-robin fashion based on their indices.
126    */
127   private final ConcurrentMap<Integer, Integer> partitionIndex =
128       Maps.newConcurrentMap();
129   /**
130    * Sequential counter used to assign indices to partitions as they are added
131    */
132   private final AtomicInteger indexCounter = new AtomicInteger(0);
133   /** How many disks (i.e. IO threads) do we have? */
134   private final int numIOThreads;
135 
136   /**
137    * Constructor
138    *
139    * @param numIOThreads number of IO threads
140    * @param oocEngine out-of-core engine
141    */
142   public MetaPartitionManager(int numIOThreads, OutOfCoreEngine oocEngine) {
143     perThreadPartitionDictionary = new ArrayList<>(numIOThreads);
144     perThreadVertexEdgeBuffers = new ArrayList<>(numIOThreads);
145     perThreadMessageBuffers = new ArrayList<>(numIOThreads);
146     for (int i = 0; i < numIOThreads; ++i) {
147       perThreadPartitionDictionary.add(new MetaPartitionDictionary());
148       perThreadMessageBuffers.add(Sets.<Integer>newConcurrentHashSet());
149       perThreadVertexEdgeBuffers.add(Sets.<Integer>newConcurrentHashSet());
150     }
151     this.oocEngine = oocEngine;
152     this.randomGenerator = new Random();
153     this.numIOThreads = numIOThreads;
154   }
155 
156   /**
157    * @return number of partitions in memory
158    */
159   public int getNumInMemoryPartitions() {
160     return numInMemoryPartitions.get();
161   }
162 
163   /**
164    * @return number of partitions that are partially in memory
165    */
166   public int getNumPartiallyInMemoryPartitions() {
167     return numPartiallyInMemoryPartitions.get();
168   }
169 
170   /**
171    * Get total number of partitions
172    *
173    * @return total number of partitions
174    */
175   public int getNumPartitions() {
176     return partitions.size();
177   }
178 
179   /**
180    * Since the statistics are based on estimates, we assume each partial
181    * partition is taking about half of the full partition in terms of memory
182    * footprint.
183    *
184    * @return estimate of fraction of graph in memory
185    */
186   public double getGraphFractionInMemory() {
187     return (getNumInMemoryPartitions() +
188         getNumPartiallyInMemoryPartitions() / 2.0) / getNumPartitions();
189   }
190 
191   /**
192    * Update the lowest fraction of graph in memory so to have a more accurate
193    * information in one of the counters.
194    */
195   private synchronized void updateGraphFractionInMemory() {
196     double graphInMemory = getGraphFractionInMemory();
197     if (graphInMemory < lowestGraphFractionInMemory.get()) {
198       lowestGraphFractionInMemory.set(graphInMemory);
199       WorkerProgress.get().updateLowestGraphPercentageInMemory(
200           (int) (graphInMemory * 100));
201     }
202   }
203 
204   /**
205    * Update the book-keeping about number of in-memory partitions and partially
206    * in-memory partitions with regard to the storage status of the partition and
207    * its current messages before and after an update to its status.
208    *
209    * @param stateBefore the storage state of the partition and its current
210    *                    messages before an update
211    * @param stateAfter the storage state of the partition and its current
212    *                   messages after an update
213    */
214   private void updateCounters(PartitionStorageState stateBefore,
215                               PartitionStorageState stateAfter) {
216     numInMemoryPartitions.getAndAdd(
217         ((stateAfter == PartitionStorageState.FULLY_IN_MEM) ? 1 : 0) -
218             ((stateBefore == PartitionStorageState.FULLY_IN_MEM) ? 1 : 0));
219     numPartiallyInMemoryPartitions.getAndAdd(
220         ((stateAfter == PartitionStorageState.PARTIALLY_IN_MEM) ? 1 : 0) -
221             ((stateBefore == PartitionStorageState.PARTIALLY_IN_MEM) ? 1 : 0));
222   }
223 
224   /**
225    * Whether a given partition is available
226    *
227    * @param partitionId id of the partition to check if this worker owns it
228    * @return true if the worker owns the partition, false otherwise
229    */
230   public boolean hasPartition(Integer partitionId) {
231     return partitions.containsKey(partitionId);
232   }
233 
234   /**
235    * Return the list of all available partitions as an iterable
236    *
237    * @return list of all available partitions
238    */
239   public Iterable<Integer> getPartitionIds() {
240     return partitions.keySet();
241   }
242 
243   /**
244    * Get the thread id that is responsible for a particular partition
245    *
246    * @param partitionId id of the given partition
247    * @return id of the thread responsible for the given partition
248    */
249   public int getOwnerThreadId(int partitionId) {
250     Integer index = partitionIndex.get(partitionId);
251     checkState(index != null);
252     return index % numIOThreads;
253   }
254 
255   /**
256    * Add a partition
257    *
258    * @param partitionId id of a partition to add
259    */
260   public void addPartition(int partitionId) {
261     MetaPartition meta = new MetaPartition(partitionId);
262     MetaPartition temp = partitions.putIfAbsent(partitionId, meta);
263     // Check if the given partition is new
264     if (temp == null) {
265       int index = indexCounter.getAndIncrement();
266       checkState(partitionIndex.putIfAbsent(partitionId, index) == null);
267       int ownerThread = getOwnerThreadId(partitionId);
268       perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
269       numInMemoryPartitions.getAndIncrement();
270     }
271   }
272 
273   /**
274    * Remove a partition. This method assumes that the partition is already
275    * retrieved and is in memory)
276    *
277    * @param partitionId id of a partition to remove
278    */
279   public void removePartition(Integer partitionId) {
280     MetaPartition meta = partitions.remove(partitionId);
281     int ownerThread = getOwnerThreadId(partitionId);
282     perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
283     checkState(!meta.isOnDisk());
284     numInMemoryPartitions.getAndDecrement();
285   }
286 
287   /**
288    * Pops an entry from the specified set.
289    *
290    * @param set set to pop an entry from
291    * @param <T> Type of entries in the set
292    * @return popped entry from the given set
293    */
294   private static <T> T popFromSet(Set<T> set) {
295     if (!set.isEmpty()) {
296       Iterator<T> it = set.iterator();
297       T entry = it.next();
298       it.remove();
299       return entry;
300     }
301     return null;
302   }
303 
304   /**
305    * Peeks an entry from the specified set.
306    *
307    * @param set set to peek an entry from
308    * @param <T> Type of entries in the set
309    * @return peeked entry from the given set
310    */
311   private static <T> T peekFromSet(Set<T> set) {
312     if (!set.isEmpty()) {
313       return set.iterator().next();
314     }
315     return null;
316   }
317 
318   /**
319    * Get id of a partition to offload to disk. Prioritize offloading processed
320    * partitions over unprocessed partition. Also, prioritize offloading
321    * partitions partially in memory over partitions fully in memory.
322    *
323    * @param threadId id of the thread who is going to store the partition on
324    *                 disk
325    * @return id of the partition to offload on disk
326    */
327   public Integer getOffloadPartitionId(int threadId) {
328     // First, look for a processed partition partially on disk
329     MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup(
330         ProcessingState.PROCESSED,
331         StorageState.IN_MEM,
332         StorageState.ON_DISK,
333         null);
334     if (meta != null) {
335       return meta.getPartitionId();
336     }
337     meta = perThreadPartitionDictionary.get(threadId).lookup(
338         ProcessingState.PROCESSED,
339         StorageState.ON_DISK,
340         StorageState.IN_MEM,
341         null);
342     if (meta != null) {
343       return meta.getPartitionId();
344     }
345     // Second, look for a processed partition entirely in memory
346     meta = perThreadPartitionDictionary.get(threadId).lookup(
347         ProcessingState.PROCESSED,
348         StorageState.IN_MEM,
349         StorageState.IN_MEM,
350         null);
351     if (meta != null) {
352       return meta.getPartitionId();
353     }
354 
355     // Third, look for an unprocessed partition partially on disk
356     meta = perThreadPartitionDictionary.get(threadId).lookup(
357         ProcessingState.UNPROCESSED,
358         StorageState.IN_MEM,
359         StorageState.ON_DISK,
360         null);
361     if (meta != null) {
362       return meta.getPartitionId();
363     }
364     meta = perThreadPartitionDictionary.get(threadId).lookup(
365         ProcessingState.UNPROCESSED,
366         StorageState.ON_DISK,
367         StorageState.IN_MEM,
368         null);
369     if (meta != null) {
370       return meta.getPartitionId();
371     }
372     // Forth, look for an unprocessed partition entirely in memory
373     meta = perThreadPartitionDictionary.get(threadId).lookup(
374         ProcessingState.UNPROCESSED,
375         StorageState.IN_MEM,
376         StorageState.IN_MEM,
377         null);
378     if (meta != null) {
379       return meta.getPartitionId();
380     }
381     return null;
382   }
383 
384   /**
385    * Get id of a partition to offload its vertex/edge buffers on disk
386    *
387    * @param threadId id of the thread who is going to store the buffers on disk
388    * @return id of the partition to offload its vertex/edge buffers on disk
389    */
390   public Integer getOffloadPartitionBufferId(int threadId) {
391     if (oocEngine.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP) {
392       Integer partitionId =
393           popFromSet(perThreadVertexEdgeBuffers.get(threadId));
394       if (partitionId == null) {
395         DiskBackedPartitionStore<?, ?, ?> partitionStore =
396             (DiskBackedPartitionStore<?, ?, ?>) (oocEngine.getServerData()
397                 .getPartitionStore());
398         perThreadVertexEdgeBuffers.get(threadId)
399             .addAll(partitionStore.getCandidateBuffersToOffload(threadId));
400         DiskBackedEdgeStore<?, ?, ?> edgeStore =
401             (DiskBackedEdgeStore<?, ?, ?>) (oocEngine.getServerData())
402                 .getEdgeStore();
403         perThreadVertexEdgeBuffers.get(threadId)
404             .addAll(edgeStore.getCandidateBuffersToOffload(threadId));
405         partitionId = popFromSet(perThreadVertexEdgeBuffers.get(threadId));
406       }
407       return partitionId;
408     }
409     return null;
410   }
411 
412   /**
413    * Get id of a partition to offload its incoming message buffers on disk
414    *
415    * @param threadId id of the thread who is going to store the buffers on disk
416    * @return id of the partition to offload its message buffer on disk
417    */
418   public Integer getOffloadMessageBufferId(int threadId) {
419     if (oocEngine.getSuperstep() != BspServiceWorker.INPUT_SUPERSTEP) {
420       Integer partitionId =
421           popFromSet(perThreadMessageBuffers.get(threadId));
422       if (partitionId == null) {
423         DiskBackedMessageStore<?, ?> messageStore =
424             (DiskBackedMessageStore<?, ?>) (oocEngine.getServerData()
425                 .getIncomingMessageStore());
426         if (messageStore != null) {
427           perThreadMessageBuffers.get(threadId)
428               .addAll(messageStore.getCandidateBuffersToOffload(threadId));
429           partitionId = popFromSet(perThreadMessageBuffers.get(threadId));
430         }
431       }
432       return partitionId;
433     }
434     return null;
435   }
436 
437   /**
438    * Get id of a partition to offload its incoming message on disk. Prioritize
439    * offloading messages of partitions already on disk, and then partitions
440    * in-transit, over partitions in-memory. Also, prioritize processed
441    * partitions over unprocessed (processed partitions would go on disk with
442    * more chances that unprocessed partitions)
443    *
444    * @param threadId id of the thread who is going to store the incoming
445    *                 messages on disk
446    * @return id of the partition to offload its message on disk
447    */
448   public Integer getOffloadMessageId(int threadId) {
449     if (oocEngine.getSuperstep() == BspService.INPUT_SUPERSTEP) {
450       return null;
451     }
452     MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup(
453         ProcessingState.PROCESSED,
454         StorageState.ON_DISK,
455         null,
456         StorageState.IN_MEM);
457     if (meta != null) {
458       return meta.getPartitionId();
459     }
460     meta = perThreadPartitionDictionary.get(threadId).lookup(
461         ProcessingState.PROCESSED,
462         StorageState.IN_TRANSIT,
463         null,
464         StorageState.IN_MEM);
465     if (meta != null) {
466       return meta.getPartitionId();
467     }
468     meta = perThreadPartitionDictionary.get(threadId).lookup(
469         ProcessingState.UNPROCESSED,
470         StorageState.ON_DISK,
471         null,
472         StorageState.IN_MEM);
473     if (meta != null) {
474       return meta.getPartitionId();
475     }
476     meta = perThreadPartitionDictionary.get(threadId).lookup(
477         ProcessingState.UNPROCESSED,
478         StorageState.IN_TRANSIT,
479         null,
480         StorageState.IN_MEM);
481     if (meta != null) {
482       return meta.getPartitionId();
483     }
484     return null;
485   }
486 
487   /**
488    * Get id of a partition to load its data to memory. Prioritize loading an
489    * unprocessed partition over loading processed partition. Also, prioritize
490    * loading a partition partially in memory over partitions entirely on disk.
491    *
492    * @param threadId id of the thread who is going to load the partition data
493    * @return id of the partition to load its data to memory
494    */
495   public Integer getLoadPartitionId(int threadId) {
496     // First, look for an unprocessed partition partially in memory
497     MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup(
498         ProcessingState.UNPROCESSED,
499         StorageState.IN_MEM,
500         StorageState.ON_DISK,
501         null);
502     if (meta != null) {
503       return meta.getPartitionId();
504     }
505 
506     meta = perThreadPartitionDictionary.get(threadId).lookup(
507         ProcessingState.UNPROCESSED,
508         StorageState.ON_DISK,
509         StorageState.IN_MEM,
510         null);
511     if (meta != null) {
512       return meta.getPartitionId();
513     }
514 
515     // Second, look for an unprocessed partition entirely on disk
516     meta = perThreadPartitionDictionary.get(threadId).lookup(
517         ProcessingState.UNPROCESSED,
518         StorageState.ON_DISK,
519         StorageState.ON_DISK,
520         null);
521     if (meta != null) {
522       return meta.getPartitionId();
523     }
524 
525     // Third, look for a processed partition partially in memory
526     meta = perThreadPartitionDictionary.get(threadId).lookup(
527         ProcessingState.PROCESSED,
528         StorageState.IN_MEM,
529         null,
530         StorageState.ON_DISK);
531     if (meta != null) {
532       return meta.getPartitionId();
533     }
534 
535     meta = perThreadPartitionDictionary.get(threadId).lookup(
536         ProcessingState.PROCESSED,
537         StorageState.ON_DISK,
538         null,
539         StorageState.IN_MEM);
540     if (meta != null) {
541       return meta.getPartitionId();
542     }
543 
544     meta = perThreadPartitionDictionary.get(threadId).lookup(
545         ProcessingState.PROCESSED,
546         StorageState.ON_DISK,
547         null,
548         StorageState.ON_DISK);
549     if (meta != null) {
550       return meta.getPartitionId();
551     }
552 
553     return null;
554   }
555 
556   /**
557    * Mark a partition as being 'IN_PROCESS'
558    *
559    * @param partitionId id of the partition to mark
560    */
561   public void markPartitionAsInProcess(int partitionId) {
562     MetaPartition meta = partitions.get(partitionId);
563     int ownerThread = getOwnerThreadId(partitionId);
564     synchronized (meta) {
565       perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
566       meta.setProcessingState(ProcessingState.IN_PROCESS);
567       perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
568     }
569   }
570 
571   /**
572    * Whether there is any processed partition stored in memory (excluding those
573    * that are prefetched to execute in the next superstep).
574    *
575    * @return true iff there is any processed partition in memory
576    */
577   public boolean hasProcessedOnMemory() {
578     for (MetaPartitionDictionary dictionary : perThreadPartitionDictionary) {
579       if (dictionary.hasProcessedOnMemory()) {
580         return true;
581       }
582     }
583     return false;
584   }
585 
586   /**
587    * Whether a partition is *processed* in the current iteration cycle over
588    * partitions.
589    *
590    * @param partitionId id of the partition to check
591    * @return true iff processing the given partition is done
592    */
593   public boolean isPartitionProcessed(Integer partitionId) {
594     MetaPartition meta = partitions.get(partitionId);
595     synchronized (meta) {
596       return meta.getProcessingState() == ProcessingState.PROCESSED;
597     }
598   }
599 
600   /**
601    * Mark a partition as 'PROCESSED'
602    *
603    * @param partitionId id of the partition to mark
604    */
605   public void setPartitionIsProcessed(int partitionId) {
606     MetaPartition meta = partitions.get(partitionId);
607     int ownerThread = getOwnerThreadId(partitionId);
608     synchronized (meta) {
609       perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
610       meta.setProcessingState(ProcessingState.PROCESSED);
611       perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
612     }
613     numPartitionsProcessed.getAndIncrement();
614   }
615 
616   /**
617    * Notify this meta store that load of a partition for a specific superstep
618    * is about to start.
619    *
620    * @param partitionId id of the partition to load to memory
621    * @param superstep superstep in which the partition is needed for
622    * @return true iff load of the given partition is viable
623    */
624   public boolean startLoadingPartition(int partitionId, long superstep) {
625     MetaPartition meta = partitions.get(partitionId);
626     synchronized (meta) {
627       boolean shouldLoad = meta.getPartitionState() == StorageState.ON_DISK;
628       if (superstep == oocEngine.getSuperstep()) {
629         shouldLoad |= meta.getCurrentMessagesState() == StorageState.ON_DISK;
630       } else {
631         shouldLoad |= meta.getIncomingMessagesState() == StorageState.ON_DISK;
632       }
633       return shouldLoad;
634     }
635   }
636 
637   /**
638    * Notify this meta store that load of a partition for a specific superstep
639    * is completed
640    *
641    * @param partitionId id of the partition for which the load is completed
642    * @param superstep superstep in which the partition is loaded for
643    */
644   public void doneLoadingPartition(int partitionId, long superstep) {
645     MetaPartition meta = partitions.get(partitionId);
646     int owner = getOwnerThreadId(partitionId);
647     synchronized (meta) {
648       PartitionStorageState stateBefore = meta.getPartitionStorageState();
649       perThreadPartitionDictionary.get(owner).removePartition(meta);
650       meta.setPartitionState(StorageState.IN_MEM);
651       if (superstep == oocEngine.getSuperstep()) {
652         meta.setCurrentMessagesState(StorageState.IN_MEM);
653       } else {
654         meta.setIncomingMessagesState(StorageState.IN_MEM);
655       }
656       PartitionStorageState stateAfter = meta.getPartitionStorageState();
657       updateCounters(stateBefore, stateAfter);
658       // Check whether load was to prefetch a partition from disk to memory for
659       // the next superstep
660       if (meta.getProcessingState() == ProcessingState.PROCESSED) {
661         perThreadPartitionDictionary.get(owner).increaseNumPrefetch();
662       }
663       perThreadPartitionDictionary.get(owner).addPartition(meta);
664     }
665     updateGraphFractionInMemory();
666   }
667 
668   /**
669    * Notify this meta store that offload of messages for a particular partition
670    * is about to start.
671    *
672    * @param partitionId id of the partition that its messages is being offloaded
673    * @return true iff offload of messages of the given partition is viable
674    */
675   public boolean startOffloadingMessages(int partitionId) {
676     MetaPartition meta = partitions.get(partitionId);
677     int ownerThread = getOwnerThreadId(partitionId);
678     synchronized (meta) {
679       if (meta.getIncomingMessagesState() == StorageState.IN_MEM) {
680         perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
681         meta.setIncomingMessagesState(StorageState.IN_TRANSIT);
682         perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
683         return true;
684       } else {
685         return false;
686       }
687     }
688   }
689 
690   /**
691    * Notify this meta store that offload of messages for a particular partition
692    * is complete.
693    *
694    * @param partitionId id of the partition that its messages is offloaded to
695    *                    disk
696    */
697   public void doneOffloadingMessages(int partitionId) {
698     MetaPartition meta = partitions.get(partitionId);
699     int ownerThread = getOwnerThreadId(partitionId);
700     synchronized (meta) {
701       perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
702       meta.setIncomingMessagesState(StorageState.ON_DISK);
703       perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
704     }
705   }
706 
707   /**
708    * Notify this meta store that offload of raw data buffers (vertex/edges/
709    * messages) of a particular partition is about to start.
710    *
711    * @param partitionId id of the partition that its buffer is being offloaded
712    * @return true iff offload of buffers of the given partition is viable
713    */
714   public boolean startOffloadingBuffer(int partitionId) {
715     // Do nothing
716     return true;
717   }
718 
719   /**
720    * Notify this meta store that offload of raw data buffers (vertex/edges/
721    * messages) of a particular partition is completed.
722    *
723    * @param partitionId id of the partition that its buffer is offloaded
724    */
725   public void doneOffloadingBuffer(int partitionId) {
726     // Do nothing
727   }
728 
729   /**
730    * Notify this meta store that offload of a partition (partition data and its
731    * current messages) is about to start.
732    *
733    * @param partitionId id of the partition that its data is being offloaded
734    * @return true iff offload of the given partition is viable
735    */
736   public boolean startOffloadingPartition(int partitionId) {
737     MetaPartition meta = partitions.get(partitionId);
738     int owner = getOwnerThreadId(partitionId);
739     synchronized (meta) {
740       if (meta.getProcessingState() != ProcessingState.IN_PROCESS &&
741           (meta.getPartitionState() == StorageState.IN_MEM ||
742           meta.getCurrentMessagesState() == StorageState.IN_MEM)) {
743         perThreadPartitionDictionary.get(owner).removePartition(meta);
744         // We may only need to offload either partition or current messages of
745         // that partition to disk. So, if either of the components (partition
746         // or its current messages) is already on disk, we should not update its
747         // metadata.
748         if (meta.getPartitionState() != StorageState.ON_DISK) {
749           meta.setPartitionState(StorageState.IN_TRANSIT);
750         }
751         if (meta.getCurrentMessagesState() != StorageState.ON_DISK) {
752           meta.setCurrentMessagesState(StorageState.IN_TRANSIT);
753         }
754         perThreadPartitionDictionary.get(owner).addPartition(meta);
755         return true;
756       } else {
757         return false;
758       }
759     }
760   }
761 
762   /**
763    * Notify this meta store that offload of a partition (partition data and its
764    * current messages) is completed.
765    *
766    * @param partitionId id of the partition that its data is offloaded
767    */
768   public void doneOffloadingPartition(int partitionId) {
769     MetaPartition meta = partitions.get(partitionId);
770     int owner = getOwnerThreadId(partitionId);
771     synchronized (meta) {
772       // We either offload both partition and its messages to disk, or we only
773       // offload one of the components.
774       if (meta.getCurrentMessagesState() == StorageState.IN_TRANSIT &&
775           meta.getPartitionState() == StorageState.IN_TRANSIT) {
776         numInMemoryPartitions.getAndDecrement();
777       } else {
778         numPartiallyInMemoryPartitions.getAndDecrement();
779       }
780       perThreadPartitionDictionary.get(owner).removePartition(meta);
781       meta.setPartitionState(StorageState.ON_DISK);
782       meta.setCurrentMessagesState(StorageState.ON_DISK);
783       perThreadPartitionDictionary.get(owner).addPartition(meta);
784     }
785     updateGraphFractionInMemory();
786   }
787 
788   /**
789    * Reset the meta store for a new iteration cycle over all partitions.
790    * Note: this is not thread-safe and should be called from a single thread.
791    */
792   public void resetPartitions() {
793     for (MetaPartition meta : partitions.values()) {
794       int owner = getOwnerThreadId(meta.getPartitionId());
795       perThreadPartitionDictionary.get(owner).removePartition(meta);
796       meta.resetPartition();
797       perThreadPartitionDictionary.get(owner).addPartition(meta);
798     }
799     for (MetaPartitionDictionary dictionary : perThreadPartitionDictionary) {
800       dictionary.reset();
801     }
802     numPartitionsProcessed.set(0);
803   }
804 
805   /**
806    * Reset messages in the meta store.
807    * Note: this is not thread-safe and should be called from a single thread.
808    */
809   public void resetMessages() {
810     for (MetaPartition meta : partitions.values()) {
811       int owner = getOwnerThreadId(meta.getPartitionId());
812       perThreadPartitionDictionary.get(owner).removePartition(meta);
813       PartitionStorageState stateBefore = meta.getPartitionStorageState();
814       meta.resetMessages();
815       PartitionStorageState stateAfter = meta.getPartitionStorageState();
816       updateCounters(stateBefore, stateAfter);
817       perThreadPartitionDictionary.get(owner).addPartition(meta);
818     }
819   }
820 
821   /**
822    * Return the id of an unprocessed partition in memory. If all partitions are
823    * processed, return an appropriate 'finisher signal'. If there are
824    * unprocessed partitions, but none are in memory, return null.
825    *
826    * @return id of the partition to be processed next.
827    */
828   public Integer getNextPartition() {
829     if (numPartitionsProcessed.get() >= partitions.size()) {
830       return NO_PARTITION_TO_PROCESS;
831     }
832     int numThreads = perThreadPartitionDictionary.size();
833     int index = randomGenerator.nextInt(numThreads);
834     int startIndex = index;
835     MetaPartition meta;
836     do {
837       // We first look up a partition in the reverse dictionary. If there is a
838       // partition with the given properties, we then check whether we can
839       // return it as the next partition to process. If we cannot, there may
840       // still be other partitions in the dictionary, so we will continue
841       // looping through all of them. If all the partitions with our desired
842       // properties has been examined, we will break the loop.
843       while (true) {
844         meta = perThreadPartitionDictionary.get(index).lookup(
845             ProcessingState.UNPROCESSED,
846             StorageState.IN_MEM,
847             StorageState.IN_MEM,
848             null);
849         if (meta != null) {
850           // Here we should check if the 'meta' still has the same property as
851           // when it was looked up in the dictionary. There may be a case where
852           // meta changes from the time it is looked up until the moment the
853           // synchronize block is granted to progress.
854           synchronized (meta) {
855             if (meta.getProcessingState() == ProcessingState.UNPROCESSED &&
856                 meta.getPartitionState() == StorageState.IN_MEM &&
857                 meta.getCurrentMessagesState() == StorageState.IN_MEM) {
858               perThreadPartitionDictionary.get(index).removePartition(meta);
859               meta.setProcessingState(ProcessingState.IN_PROCESS);
860               perThreadPartitionDictionary.get(index).addPartition(meta);
861               return meta.getPartitionId();
862             }
863           }
864         } else {
865           break;
866         }
867       }
868       index = (index + 1) % numThreads;
869     } while (index != startIndex);
870     return null;
871   }
872 
873   /**
874    * Whether a partition is on disk (both its data and its current messages)
875    *
876    * @param partitionId id of the partition to check if it is on disk
877    * @return true if partition data or its current messages are on disk, false
878    *         otherwise
879    */
880   public boolean isPartitionOnDisk(int partitionId) {
881     MetaPartition meta = partitions.get(partitionId);
882     synchronized (meta) {
883       return meta.isOnDisk();
884     }
885   }
886 
887   /**
888    * Representation of meta information of a partition
889    */
890   private static class MetaPartition {
891     /** Id of the partition */
892     private int partitionId;
893     /** Storage state of incoming messages */
894     private StorageState incomingMessagesState;
895     /** Storage state of current messages */
896     private StorageState currentMessagesState;
897     /** Storage state of partition data */
898     private StorageState partitionState;
899     /** Processing state of a partition */
900     private ProcessingState processingState;
901 
902     /**
903      * Constructor
904      *
905      * @param partitionId id of the partition
906      */
907     public MetaPartition(int partitionId) {
908       this.partitionId = partitionId;
909       this.processingState = ProcessingState.UNPROCESSED;
910       this.partitionState = StorageState.IN_MEM;
911       this.currentMessagesState = StorageState.IN_MEM;
912       this.incomingMessagesState = StorageState.IN_MEM;
913     }
914 
915     @Override
916     public String toString() {
917       StringBuffer sb = new StringBuffer();
918       sb.append("\nMetaData: {");
919       sb.append("ID: " + partitionId + "; ");
920       sb.append("Partition: " + partitionState + "; ");
921       sb.append("Current Messages: " + currentMessagesState + "; ");
922       sb.append("Incoming Messages: " + incomingMessagesState + "; ");
923       sb.append("Processed? : " + processingState + "}");
924       return sb.toString();
925     }
926 
927     public int getPartitionId() {
928       return partitionId;
929     }
930 
931     public StorageState getIncomingMessagesState() {
932       return incomingMessagesState;
933     }
934 
935     public void setIncomingMessagesState(StorageState incomingMessagesState) {
936       this.incomingMessagesState = incomingMessagesState;
937     }
938 
939     public StorageState getCurrentMessagesState() {
940       return currentMessagesState;
941     }
942 
943     public void setCurrentMessagesState(StorageState currentMessagesState) {
944       this.currentMessagesState = currentMessagesState;
945     }
946 
947     public StorageState getPartitionState() {
948       return partitionState;
949     }
950 
951     public void setPartitionState(StorageState state) {
952       this.partitionState = state;
953     }
954 
955     public ProcessingState getProcessingState() {
956       return processingState;
957     }
958 
959     public void setProcessingState(ProcessingState processingState) {
960       this.processingState = processingState;
961     }
962 
963     /**
964      * Whether the partition is on disk (either its data or its current
965      * messages)
966      *
967      * @return true if the partition is on disk, false otherwise
968      */
969     public boolean isOnDisk() {
970       return partitionState == StorageState.ON_DISK ||
971           currentMessagesState == StorageState.ON_DISK;
972     }
973 
974     /**
975      * Reset the partition meta information for the next iteration cycle
976      */
977     public void resetPartition() {
978       processingState = ProcessingState.UNPROCESSED;
979     }
980 
981     /**
982      * Reset messages meta information for the next iteration cycle
983      */
984     public void resetMessages() {
985       currentMessagesState = incomingMessagesState;
986       incomingMessagesState = StorageState.IN_MEM;
987     }
988 
989     /**
990      * @return the state of the partition and its current messages as a whole
991      */
992     public PartitionStorageState getPartitionStorageState() {
993       if (partitionState == StorageState.ON_DISK &&
994           currentMessagesState == StorageState.ON_DISK) {
995         return PartitionStorageState.FULLY_ON_DISK;
996       } else if (partitionState == StorageState.IN_MEM &&
997           currentMessagesState == StorageState.IN_MEM) {
998         return PartitionStorageState.FULLY_IN_MEM;
999       } else {
1000         return PartitionStorageState.PARTIALLY_IN_MEM;
1001       }
1002     }
1003   }
1004 
1005   /**
1006    * Class representing reverse dictionary for partitions. The main operation
1007    * of the reverse dictionary is to lookup for a partition with certain
1008    * properties. The responsibility of keeping the dictionary consistent
1009    * when partition property changes in on the code that changes the property.
1010    * One can simply remove a partition from the dictionary, change the property
1011    * (or properties), and then add the partition to the dictionary.
1012    */
1013   private static class MetaPartitionDictionary {
1014     /**
1015      * Sets of partitions for each possible combination of properties. Each
1016      * partition can have 4 properties, and each property can have any of 3
1017      * different values. The properties are as follows (in the order in which
1018      * it is used as the dimensions of the following 4-D array):
1019      *  - processing status (PROCESSED, UN_PROCESSED, or IN_PROCESS)
1020      *  - partition storage status (IN_MEM, IN_TRANSIT, ON_DISK)
1021      *  - current messages storage status (IN_MEM, IN_TRANSIT, ON_DISK)
1022      *  - incoming messages storage status (IN_MEM, IN_TRANSIT, ON_DISK)
1023      */
1024     private final Set<MetaPartition>[][][][] partitions =
1025         (Set<MetaPartition>[][][][]) new Set<?>[3][3][3][3];
1026     /**
1027      * Number of partitions that has been prefetched to be computed in the
1028      * next superstep
1029      */
1030     private final AtomicInteger numPrefetch = new AtomicInteger(0);
1031 
1032     /**
1033      * Constructor
1034      */
1035     public MetaPartitionDictionary() {
1036       for (int i = 0; i < 3; ++i) {
1037         for (int j = 0; j < 3; ++j) {
1038           for (int k = 0; k < 3; ++k) {
1039             for (int t = 0; t < 3; ++t) {
1040               partitions[i][j][k][t] = Sets.newLinkedHashSet();
1041             }
1042           }
1043         }
1044       }
1045     }
1046 
1047     /**
1048      * Get a partition set associated with property combination that a given
1049      * partition has
1050      *
1051      * @param meta meta partition containing properties of a partition
1052      * @return partition set with the same property combination as the given
1053      *         meta partition
1054      */
1055     private Set<MetaPartition> getSet(MetaPartition meta) {
1056       return partitions[meta.getProcessingState().ordinal()]
1057           [meta.getPartitionState().ordinal()]
1058           [meta.getCurrentMessagesState().ordinal()]
1059           [meta.getIncomingMessagesState().ordinal()];
1060     }
1061 
1062     /**
1063      * Add a partition to the dictionary
1064      *
1065      * @param meta meta information of the partition to add
1066      */
1067     public void addPartition(MetaPartition meta) {
1068       Set<MetaPartition> partitionSet = getSet(meta);
1069       synchronized (partitionSet) {
1070         partitionSet.add(meta);
1071       }
1072     }
1073 
1074     /**
1075      * Remove a partition to the dictionary
1076      *
1077      * @param meta meta infomation of the partition to remove
1078      */
1079     public void removePartition(MetaPartition meta) {
1080       Set<MetaPartition> partitionSet = getSet(meta);
1081       synchronized (partitionSet) {
1082         partitionSet.remove(meta);
1083       }
1084     }
1085 
1086     /**
1087      * Lookup for a partition with given properties. One can use wildcard as
1088      * a property in lookup operation (by passing null as the property).
1089      *
1090      * @param processingState processing state property
1091      * @param partitionStorageState partition storage property
1092      * @param currentMessagesState current messages storage property
1093      * @param incomingMessagesState incoming messages storage property
1094      * @return a meta partition in the dictionary with the given combination of
1095      *         properties. If there is no such partition, return null
1096      */
1097     public MetaPartition lookup(ProcessingState processingState,
1098                                 StorageState partitionStorageState,
1099                                 StorageState currentMessagesState,
1100                                 StorageState incomingMessagesState) {
1101       int iStart =
1102           (processingState == null) ? 0 : processingState.ordinal();
1103       int iEnd =
1104           (processingState == null) ? 3 : (processingState.ordinal() + 1);
1105       int jStart =
1106           (partitionStorageState == null) ? 0 : partitionStorageState.ordinal();
1107       int jEnd = (partitionStorageState == null) ? 3 :
1108               (partitionStorageState.ordinal() + 1);
1109       int kStart =
1110           (currentMessagesState == null) ? 0 : currentMessagesState.ordinal();
1111       int kEnd = (currentMessagesState == null) ? 3 :
1112               (currentMessagesState.ordinal() + 1);
1113       int tStart =
1114           (incomingMessagesState == null) ? 0 : incomingMessagesState.ordinal();
1115       int tEnd = (incomingMessagesState == null) ? 3 :
1116           (incomingMessagesState.ordinal() + 1);
1117       for (int i = iStart; i < iEnd; ++i) {
1118         for (int j = jStart; j < jEnd; ++j) {
1119           for (int k = kStart; k < kEnd; ++k) {
1120             for (int t = tStart; t < tEnd; ++t) {
1121               Set<MetaPartition> partitionSet = partitions[i][j][k][t];
1122               synchronized (partitionSet) {
1123                 MetaPartition meta = peekFromSet(partitionSet);
1124                 if (meta != null) {
1125                   return meta;
1126                 }
1127               }
1128             }
1129           }
1130         }
1131       }
1132       return null;
1133     }
1134 
1135     /**
1136      * Whether there is an in-memory partition that is processed already,
1137      * excluding those partitions that are prefetched
1138      *
1139      * @return true if there is a processed in-memory partition
1140      */
1141     public boolean hasProcessedOnMemory() {
1142       int count = 0;
1143       for (int i = 0; i < 3; ++i) {
1144         for (int j = 0; j < 3; ++j) {
1145           Set<MetaPartition> partitionSet =
1146               partitions[ProcessingState.PROCESSED.ordinal()]
1147                   [StorageState.IN_MEM.ordinal()][i][j];
1148           synchronized (partitionSet) {
1149             count += partitionSet.size();
1150           }
1151         }
1152       }
1153       return count - numPrefetch.get() != 0;
1154     }
1155 
1156     /** Increase number of prefetch-ed partition by 1 */
1157     public void increaseNumPrefetch() {
1158       numPrefetch.getAndIncrement();
1159     }
1160 
1161     /**
1162      * Reset the dictionary preparing it for the next iteration cycle over
1163      * partitions
1164      */
1165     public void reset() {
1166       numPrefetch.set(0);
1167     }
1168   }
1169 }