I - Vertex idV - Vertex dataE - Edge datapublic class DiskBackedPartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable> extends DiskBackedDataStore<ExtendedDataOutput> implements PartitionStore<I,V,E>
hasPartitionDataOnFile, MINIMUM_BUFFER_SIZE_TO_FLUSH, oocEngine| Constructor and Description |
|---|
DiskBackedPartitionStore(PartitionStore<I,V,E> partitionStore,
ImmutableClassesGiraphConfiguration<I,V,E> conf,
org.apache.hadoop.mapreduce.Mapper.Context context,
OutOfCoreEngine oocEngine)
Constructor.
|
| Modifier and Type | Method and Description |
|---|---|
protected void |
addEntryToInMemoryPartitionData(int partitionId,
ExtendedDataOutput vertices)
Adds a single entry for a given partition to the in-memory data store.
|
boolean |
addPartition(Partition<I,V,E> partition)
Add a *new* partition to the store.
|
void |
addPartitionVertices(Integer partitionId,
ExtendedDataOutput extendedDataOutput)
Add vertices to a given partition from a given DataOutput instance.
|
protected int |
entrySerializedSize(ExtendedDataOutput vertices)
Gets the size of a given entry in bytes.
|
Partition<I,V,E> |
getNextPartition()
Return the next partition in iteration for the current superstep.
|
int |
getNumPartitions()
Return the number of stored partitions.
|
long |
getPartitionEdgeCount(Integer partitionId)
Return the number of edges in a partition.
|
Iterable<Integer> |
getPartitionIds()
Return the ids of all the stored partitions as an Iterable.
|
long |
getPartitionVertexCount(Integer partitionId)
Return the number of vertices in a partition.
|
boolean |
hasPartition(Integer partitionId)
Whether a specific partition is present in the store.
|
void |
initialize()
Called at the beginning of the computation.
|
boolean |
isEmpty()
Whether the partition store is empty.
|
protected long |
loadInMemoryPartitionData(int partitionId,
int ioThreadId,
DataIndex index)
Loads data of a partition into data store.
|
long |
loadPartitionData(int partitionId)
Loads and assembles all data for a given partition, and put it into the
data store.
|
long |
offloadBuffers(int partitionId)
Offloads raw data buffers of a given partition to disk, and returns the
number of bytes offloaded from memory to disk.
|
protected long |
offloadInMemoryPartitionData(int partitionId,
int ioThreadId,
DataIndex index)
Offloads data of a partition in data store to disk.
|
long |
offloadPartitionData(int partitionId)
Offloads partition data of a given partition in the data store to disk, and
returns the number of bytes offloaded from memory to disk.
|
void |
putPartition(Partition<I,V,E> partition)
Put a partition back to the store.
|
protected ExtendedDataOutput |
readNextEntry(DataInput in)
Reads the next available raw entry from a given input stream.
|
Partition<I,V,E> |
removePartition(Integer partitionId)
Remove a partition and return it.
|
void |
shutdown()
Called at the end of the computation.
|
void |
startIteration()
Start the iteration cycle to iterate over partitions.
|
protected void |
writeEntry(ExtendedDataOutput vertices,
DataOutput out)
Writes a single raw entry to a given output stream.
|
addEntry, getCandidateBuffersToOffload, loadPartitionDataProxy, offloadBuffersProxy, offloadPartitionDataProxypublic DiskBackedPartitionStore(PartitionStore<I,V,E> partitionStore, ImmutableClassesGiraphConfiguration<I,V,E> conf, org.apache.hadoop.mapreduce.Mapper.Context context, OutOfCoreEngine oocEngine)
partitionStore - In-memory partition store for which out-of-code
partition store would be a wrapperconf - Configurationcontext - Job contextoocEngine - Out-of-core enginepublic boolean addPartition(Partition<I,V,E> partition)
PartitionStoreaddPartition in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>partition - Partition to addpublic Partition<I,V,E> removePartition(Integer partitionId)
PartitionStoreremovePartition in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>partitionId - Partition idpublic boolean hasPartition(Integer partitionId)
PartitionStorehasPartition in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>partitionId - Partition idpublic Iterable<Integer> getPartitionIds()
PartitionStoregetPartitionIds in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>public int getNumPartitions()
PartitionStoregetNumPartitions in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>public long getPartitionVertexCount(Integer partitionId)
PartitionStoregetPartitionVertexCount in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>partitionId - Partition idpublic long getPartitionEdgeCount(Integer partitionId)
PartitionStoregetPartitionEdgeCount in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>partitionId - Partition idpublic boolean isEmpty()
PartitionStoreisEmpty in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>public void startIteration()
PartitionStorePartitionStore.getNextPartition() to iterate over the partitions.
Each time PartitionStore.getNextPartition() is called an unprocessed partition in
the current iteration is returned. After processing of the partition is
done, partition should be put back in the store using
PartitionStore.putPartition(Partition). Here is an example of the entire
workflow:
In the main thread:
partitionStore.startIteration();
In multiple threads iterating over the partitions:
Partition partition = partitionStore.getNextPartition();
... do stuff with partition ...
partitionStore.putPartition(partition);
Called from a single thread.startIteration in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>public Partition<I,V,E> getNextPartition()
PartitionStorePartitionStore.putPartition(Partition) after use. Look at comments on
PartitionStore.startIteration() for more detail.getNextPartition in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>public void putPartition(Partition<I,V,E> partition)
PartitionStorePartitionStore.getNextPartition().
Look at comments on PartitionStore.startIteration() for more detail.putPartition in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>partition - Partitionpublic void addPartitionVertices(Integer partitionId, ExtendedDataOutput extendedDataOutput)
PartitionStoreaddPartitionVertices in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>partitionId - Partition idextendedDataOutput - Output containing serialized vertex datapublic void shutdown()
PartitionStoreshutdown in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>public void initialize()
PartitionStoreinitialize in interface PartitionStore<I extends org.apache.hadoop.io.WritableComparable,V extends org.apache.hadoop.io.Writable,E extends org.apache.hadoop.io.Writable>protected long loadInMemoryPartitionData(int partitionId,
int ioThreadId,
DataIndex index)
throws IOException
DiskBackedDataStoreloadInMemoryPartitionData in class DiskBackedDataStore<ExtendedDataOutput>partitionId - id of the partition to load its dataioThreadId - id of the IO thread performing the loadindex - data index chain for the data to loadIOExceptionprotected ExtendedDataOutput readNextEntry(DataInput in) throws IOException
DiskBackedDataStorereadNextEntry in class DiskBackedDataStore<ExtendedDataOutput>in - input stream to read the entry fromIOExceptionprotected void addEntryToInMemoryPartitionData(int partitionId,
ExtendedDataOutput vertices)
DiskBackedDataStoreaddEntryToInMemoryPartitionData in class DiskBackedDataStore<ExtendedDataOutput>partitionId - id of the partition to add the data tovertices - input entry to add to the data storepublic long loadPartitionData(int partitionId)
throws IOException
DiskBackedDataStoreloadPartitionData in class DiskBackedDataStore<ExtendedDataOutput>partitionId - id of the partition to load and assemble all data forIOExceptionpublic long offloadPartitionData(int partitionId)
throws IOException
DiskBackedDataStoreoffloadPartitionData in class DiskBackedDataStore<ExtendedDataOutput>partitionId - id of the partition to offload its dataIOExceptionprotected long offloadInMemoryPartitionData(int partitionId,
int ioThreadId,
DataIndex index)
throws IOException
DiskBackedDataStoreoffloadInMemoryPartitionData in class DiskBackedDataStore<ExtendedDataOutput>partitionId - id of the partition to offload to diskioThreadId - id of the IO thread performing the offloadindex - data index chain for the data to offloadIOExceptionprotected void writeEntry(ExtendedDataOutput vertices, DataOutput out) throws IOException
DiskBackedDataStorewriteEntry in class DiskBackedDataStore<ExtendedDataOutput>vertices - entry to write to outputout - output stream to write the entry toIOExceptionpublic long offloadBuffers(int partitionId)
throws IOException
DiskBackedDataStoreoffloadBuffers in class DiskBackedDataStore<ExtendedDataOutput>partitionId - id of the partition to offload its raw data buffersIOExceptionprotected int entrySerializedSize(ExtendedDataOutput vertices)
DiskBackedDataStoreentrySerializedSize in class DiskBackedDataStore<ExtendedDataOutput>vertices - input entry to find its sizeCopyright © 2011-2020 The Apache Software Foundation. All Rights Reserved.