I - Vertex idM - Message datapublic class DiskBackedMessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable> extends DiskBackedDataStore<VertexIdMessages<I,M>> implements MessageStore<I,M>
hasPartitionDataOnFile, MINIMUM_BUFFER_SIZE_TO_FLUSH, oocEngine| Constructor and Description |
|---|
DiskBackedMessageStore(ImmutableClassesGiraphConfiguration<I,?,?> config,
OutOfCoreEngine oocEngine,
MessageStore<I,M> messageStore,
boolean useMessageCombiner,
long superstep)
Constructor
|
| Modifier and Type | Method and Description |
|---|---|
protected void |
addEntryToInMemoryPartitionData(int partitionId,
VertexIdMessages<I,M> messages)
Adds a single entry for a given partition to the in-memory data store.
|
void |
addMessage(I vertexId,
M message)
Adds a message for a particular vertex
The method is used by InternalMessageStore to send local messages; for
the general case, use a more efficient addPartitionMessages
|
void |
addPartitionMessages(int partitionId,
VertexIdMessages<I,M> messages)
Adds messages for partition
|
void |
clearAll()
Clears all resources used by this store.
|
void |
clearPartition(int partitionId)
Clears messages for a partition.
|
void |
clearVertexMessages(I vertexId)
Clears messages for a vertex.
|
protected int |
entrySerializedSize(VertexIdMessages<I,M> messages)
Gets the size of a given entry in bytes.
|
void |
finalizeStore()
Called before start of computation in bspworker
Since it is run from a single thread while the store is not being
accessed by any other thread - this is ensured to be thread-safe
|
Iterable<I> |
getPartitionDestinationVertices(int partitionId)
Gets vertex ids from selected partition which we have messages for
|
Iterable<M> |
getVertexMessages(I vertexId)
Gets messages for a vertex.
|
boolean |
hasMessagesForPartition(int partitionId)
Check if we have messages for some partition
|
boolean |
hasMessagesForVertex(I vertexId)
Check if we have messages for some vertex
|
boolean |
isPointerListEncoding()
True if this message-store encodes messages as a list of long pointers
to compact serialized messages
|
protected long |
loadInMemoryPartitionData(int partitionId,
int ioThreadId,
DataIndex index)
Loads data of a partition into data store.
|
long |
loadPartitionData(int partitionId)
Loads and assembles all data for a given partition, and put it into the
data store.
|
long |
offloadBuffers(int partitionId)
Offloads raw data buffers of a given partition to disk, and returns the
number of bytes offloaded from memory to disk.
|
protected long |
offloadInMemoryPartitionData(int partitionId,
int ioThreadId,
DataIndex index)
Offloads data of a partition in data store to disk.
|
long |
offloadPartitionData(int partitionId)
Offloads partition data of a given partition in the data store to disk, and
returns the number of bytes offloaded from memory to disk.
|
void |
readFieldsForPartition(DataInput in,
int partitionId)
Deserialize messages for one partition
|
protected VertexIdMessages<I,M> |
readNextEntry(DataInput in)
Reads the next available raw entry from a given input stream.
|
protected void |
writeEntry(VertexIdMessages<I,M> messages,
DataOutput out)
Writes a single raw entry to a given output stream.
|
void |
writePartition(DataOutput out,
int partitionId)
Serialize messages for one partition.
|
addEntry, getCandidateBuffersToOffload, loadPartitionDataProxy, offloadBuffersProxy, offloadPartitionDataProxypublic DiskBackedMessageStore(ImmutableClassesGiraphConfiguration<I,?,?> config, OutOfCoreEngine oocEngine, MessageStore<I,M> messageStore, boolean useMessageCombiner, long superstep)
config - ConfigurationoocEngine - Out-of-core enginemessageStore - In-memory message store for which out-of-core message
store would be wrapperuseMessageCombiner - Whether message combiner is used for this message
storesuperstep - superstep number this messages store is used forpublic boolean isPointerListEncoding()
MessageStoreisPointerListEncoding in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>public Iterable<M> getVertexMessages(I vertexId)
MessageStoregetVertexMessages in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>vertexId - Vertex id for which we want to get messagespublic void clearVertexMessages(I vertexId)
MessageStoreclearVertexMessages in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>vertexId - Vertex id for which we want to clear messagespublic void clearAll()
MessageStoreclearAll in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>public boolean hasMessagesForVertex(I vertexId)
MessageStorehasMessagesForVertex in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>vertexId - Id of vertex which we want to checkpublic boolean hasMessagesForPartition(int partitionId)
MessageStorehasMessagesForPartition in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>partitionId - Id of partition which we want to checkpublic void addPartitionMessages(int partitionId,
VertexIdMessages<I,M> messages)
MessageStoreaddPartitionMessages in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>partitionId - Id of partitionmessages - Collection of vertex ids and messages we want to addpublic void addMessage(I vertexId, M message) throws IOException
MessageStoreaddMessage in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>vertexId - Id of target vertexmessage - A message to sendIOExceptionpublic long loadPartitionData(int partitionId)
throws IOException
DiskBackedDataStoreloadPartitionData in class DiskBackedDataStore<VertexIdMessages<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>>partitionId - id of the partition to load and assemble all data forIOExceptionpublic long offloadPartitionData(int partitionId)
throws IOException
DiskBackedDataStoreoffloadPartitionData in class DiskBackedDataStore<VertexIdMessages<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>>partitionId - id of the partition to offload its dataIOExceptionpublic long offloadBuffers(int partitionId)
throws IOException
DiskBackedDataStoreoffloadBuffers in class DiskBackedDataStore<VertexIdMessages<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>>partitionId - id of the partition to offload its raw data buffersIOExceptionpublic void finalizeStore()
MessageStorefinalizeStore in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>public Iterable<I> getPartitionDestinationVertices(int partitionId)
MessageStoregetPartitionDestinationVertices in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>partitionId - Id of partitionpublic void clearPartition(int partitionId)
MessageStoreclearPartition in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>partitionId - Partition id for which we want to clear messagespublic void writePartition(DataOutput out, int partitionId) throws IOException
MessageStorewritePartition in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>out - DataOutput to serialize this object intopartitionId - Id of partitionIOExceptionpublic void readFieldsForPartition(DataInput in, int partitionId) throws IOException
MessageStorereadFieldsForPartition in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>in - DataInput to deserialize this object
from.partitionId - Id of partitionIOExceptionprotected void writeEntry(VertexIdMessages<I,M> messages, DataOutput out) throws IOException
DiskBackedDataStorewriteEntry in class DiskBackedDataStore<VertexIdMessages<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>>messages - entry to write to outputout - output stream to write the entry toIOExceptionprotected VertexIdMessages<I,M> readNextEntry(DataInput in) throws IOException
DiskBackedDataStorereadNextEntry in class DiskBackedDataStore<VertexIdMessages<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>>in - input stream to read the entry fromIOExceptionprotected long loadInMemoryPartitionData(int partitionId,
int ioThreadId,
DataIndex index)
throws IOException
DiskBackedDataStoreloadInMemoryPartitionData in class DiskBackedDataStore<VertexIdMessages<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>>partitionId - id of the partition to load its dataioThreadId - id of the IO thread performing the loadindex - data index chain for the data to loadIOExceptionprotected long offloadInMemoryPartitionData(int partitionId,
int ioThreadId,
DataIndex index)
throws IOException
DiskBackedDataStoreoffloadInMemoryPartitionData in class DiskBackedDataStore<VertexIdMessages<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>>partitionId - id of the partition to offload to diskioThreadId - id of the IO thread performing the offloadindex - data index chain for the data to offloadIOExceptionprotected int entrySerializedSize(VertexIdMessages<I,M> messages)
DiskBackedDataStoreentrySerializedSize in class DiskBackedDataStore<VertexIdMessages<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>>messages - input entry to find its sizeprotected void addEntryToInMemoryPartitionData(int partitionId,
VertexIdMessages<I,M> messages)
DiskBackedDataStoreaddEntryToInMemoryPartitionData in class DiskBackedDataStore<VertexIdMessages<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>>partitionId - id of the partition to add the data tomessages - input entry to add to the data storeCopyright © 2011-2020 The Apache Software Foundation. All Rights Reserved.