I
- Vertex idM
- Message datapublic class DiskBackedMessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable> extends DiskBackedDataStore<VertexIdMessages<I,M>> implements MessageStore<I,M>
hasPartitionDataOnFile, MINIMUM_BUFFER_SIZE_TO_FLUSH, oocEngine
Constructor and Description |
---|
DiskBackedMessageStore(ImmutableClassesGiraphConfiguration<I,?,?> config,
OutOfCoreEngine oocEngine,
MessageStore<I,M> messageStore,
boolean useMessageCombiner,
long superstep)
Constructor
|
Modifier and Type | Method and Description |
---|---|
protected void |
addEntryToInMemoryPartitionData(int partitionId,
VertexIdMessages<I,M> messages)
Adds a single entry for a given partition to the in-memory data store.
|
void |
addMessage(I vertexId,
M message)
Adds a message for a particular vertex
The method is used by InternalMessageStore to send local messages; for
the general case, use a more efficient addPartitionMessages
|
void |
addPartitionMessages(int partitionId,
VertexIdMessages<I,M> messages)
Adds messages for partition
|
void |
clearAll()
Clears all resources used by this store.
|
void |
clearPartition(int partitionId)
Clears messages for a partition.
|
void |
clearVertexMessages(I vertexId)
Clears messages for a vertex.
|
protected int |
entrySerializedSize(VertexIdMessages<I,M> messages)
Gets the size of a given entry in bytes.
|
void |
finalizeStore()
Called before start of computation in bspworker
Since it is run from a single thread while the store is not being
accessed by any other thread - this is ensured to be thread-safe
|
Iterable<I> |
getPartitionDestinationVertices(int partitionId)
Gets vertex ids from selected partition which we have messages for
|
Iterable<M> |
getVertexMessages(I vertexId)
Gets messages for a vertex.
|
boolean |
hasMessagesForPartition(int partitionId)
Check if we have messages for some partition
|
boolean |
hasMessagesForVertex(I vertexId)
Check if we have messages for some vertex
|
boolean |
isPointerListEncoding()
True if this message-store encodes messages as a list of long pointers
to compact serialized messages
|
protected long |
loadInMemoryPartitionData(int partitionId,
int ioThreadId,
DataIndex index)
Loads data of a partition into data store.
|
long |
loadPartitionData(int partitionId)
Loads and assembles all data for a given partition, and put it into the
data store.
|
long |
offloadBuffers(int partitionId)
Offloads raw data buffers of a given partition to disk, and returns the
number of bytes offloaded from memory to disk.
|
protected long |
offloadInMemoryPartitionData(int partitionId,
int ioThreadId,
DataIndex index)
Offloads data of a partition in data store to disk.
|
long |
offloadPartitionData(int partitionId)
Offloads partition data of a given partition in the data store to disk, and
returns the number of bytes offloaded from memory to disk.
|
void |
readFieldsForPartition(DataInput in,
int partitionId)
Deserialize messages for one partition
|
protected VertexIdMessages<I,M> |
readNextEntry(DataInput in)
Reads the next available raw entry from a given input stream.
|
protected void |
writeEntry(VertexIdMessages<I,M> messages,
DataOutput out)
Writes a single raw entry to a given output stream.
|
void |
writePartition(DataOutput out,
int partitionId)
Serialize messages for one partition.
|
addEntry, getCandidateBuffersToOffload, loadPartitionDataProxy, offloadBuffersProxy, offloadPartitionDataProxy
public DiskBackedMessageStore(ImmutableClassesGiraphConfiguration<I,?,?> config, OutOfCoreEngine oocEngine, MessageStore<I,M> messageStore, boolean useMessageCombiner, long superstep)
config
- ConfigurationoocEngine
- Out-of-core enginemessageStore
- In-memory message store for which out-of-core message
store would be wrapperuseMessageCombiner
- Whether message combiner is used for this message
storesuperstep
- superstep number this messages store is used forpublic boolean isPointerListEncoding()
MessageStore
isPointerListEncoding
in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>
public Iterable<M> getVertexMessages(I vertexId)
MessageStore
getVertexMessages
in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>
vertexId
- Vertex id for which we want to get messagespublic void clearVertexMessages(I vertexId)
MessageStore
clearVertexMessages
in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>
vertexId
- Vertex id for which we want to clear messagespublic void clearAll()
MessageStore
clearAll
in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>
public boolean hasMessagesForVertex(I vertexId)
MessageStore
hasMessagesForVertex
in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>
vertexId
- Id of vertex which we want to checkpublic boolean hasMessagesForPartition(int partitionId)
MessageStore
hasMessagesForPartition
in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>
partitionId
- Id of partition which we want to checkpublic void addPartitionMessages(int partitionId, VertexIdMessages<I,M> messages)
MessageStore
addPartitionMessages
in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>
partitionId
- Id of partitionmessages
- Collection of vertex ids and messages we want to addpublic void addMessage(I vertexId, M message) throws IOException
MessageStore
addMessage
in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>
vertexId
- Id of target vertexmessage
- A message to sendIOException
public long loadPartitionData(int partitionId) throws IOException
DiskBackedDataStore
loadPartitionData
in class DiskBackedDataStore<VertexIdMessages<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>>
partitionId
- id of the partition to load and assemble all data forIOException
public long offloadPartitionData(int partitionId) throws IOException
DiskBackedDataStore
offloadPartitionData
in class DiskBackedDataStore<VertexIdMessages<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>>
partitionId
- id of the partition to offload its dataIOException
public long offloadBuffers(int partitionId) throws IOException
DiskBackedDataStore
offloadBuffers
in class DiskBackedDataStore<VertexIdMessages<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>>
partitionId
- id of the partition to offload its raw data buffersIOException
public void finalizeStore()
MessageStore
finalizeStore
in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>
public Iterable<I> getPartitionDestinationVertices(int partitionId)
MessageStore
getPartitionDestinationVertices
in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>
partitionId
- Id of partitionpublic void clearPartition(int partitionId)
MessageStore
clearPartition
in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>
partitionId
- Partition id for which we want to clear messagespublic void writePartition(DataOutput out, int partitionId) throws IOException
MessageStore
writePartition
in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>
out
- DataOutput
to serialize this object intopartitionId
- Id of partitionIOException
public void readFieldsForPartition(DataInput in, int partitionId) throws IOException
MessageStore
readFieldsForPartition
in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>
in
- DataInput
to deserialize this object
from.partitionId
- Id of partitionIOException
protected void writeEntry(VertexIdMessages<I,M> messages, DataOutput out) throws IOException
DiskBackedDataStore
writeEntry
in class DiskBackedDataStore<VertexIdMessages<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>>
messages
- entry to write to outputout
- output stream to write the entry toIOException
protected VertexIdMessages<I,M> readNextEntry(DataInput in) throws IOException
DiskBackedDataStore
readNextEntry
in class DiskBackedDataStore<VertexIdMessages<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>>
in
- input stream to read the entry fromIOException
protected long loadInMemoryPartitionData(int partitionId, int ioThreadId, DataIndex index) throws IOException
DiskBackedDataStore
loadInMemoryPartitionData
in class DiskBackedDataStore<VertexIdMessages<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>>
partitionId
- id of the partition to load its dataioThreadId
- id of the IO thread performing the loadindex
- data index chain for the data to loadIOException
protected long offloadInMemoryPartitionData(int partitionId, int ioThreadId, DataIndex index) throws IOException
DiskBackedDataStore
offloadInMemoryPartitionData
in class DiskBackedDataStore<VertexIdMessages<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>>
partitionId
- id of the partition to offload to diskioThreadId
- id of the IO thread performing the offloadindex
- data index chain for the data to offloadIOException
protected int entrySerializedSize(VertexIdMessages<I,M> messages)
DiskBackedDataStore
entrySerializedSize
in class DiskBackedDataStore<VertexIdMessages<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>>
messages
- input entry to find its sizeprotected void addEntryToInMemoryPartitionData(int partitionId, VertexIdMessages<I,M> messages)
DiskBackedDataStore
addEntryToInMemoryPartitionData
in class DiskBackedDataStore<VertexIdMessages<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>>
partitionId
- id of the partition to add the data tomessages
- input entry to add to the data storeCopyright © 2011-2020 The Apache Software Foundation. All Rights Reserved.