I
- Vertex idM
- Message dataT
- Type of object which holds messages for one vertexpublic abstract class SimpleMessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable,T> extends Object implements MessageStore<I,M>
MessageStore
which allows any kind
of object to hold messages for one vertex.
Simple in memory message store implemented with a two level concurrent
hash map.Modifier and Type | Field and Description |
---|---|
protected ImmutableClassesGiraphConfiguration<I,?,?> |
config
Giraph configuration
|
protected ConcurrentMap<Integer,ConcurrentMap<I,T>> |
map
Map from partition id to map from vertex id to messages for that vertex
|
protected MessageValueFactory<M> |
messageValueFactory
Message class
|
protected PartitionSplitInfo<I> |
partitionInfo
Partition split info
|
Constructor and Description |
---|
SimpleMessageStore(MessageValueFactory<M> messageValueFactory,
PartitionSplitInfo<I> partitionInfo,
ImmutableClassesGiraphConfiguration<I,?,?> config)
Constructor
|
Modifier and Type | Method and Description |
---|---|
void |
clearAll()
Clears all resources used by this store.
|
void |
clearPartition(int partitionId)
Clears messages for a partition.
|
void |
clearVertexMessages(I vertexId)
Clears messages for a vertex.
|
void |
finalizeStore()
Called before start of computation in bspworker
Since it is run from a single thread while the store is not being
accessed by any other thread - this is ensured to be thread-safe
|
protected abstract Iterable<M> |
getMessagesAsIterable(T messages)
Get messages as an iterable from message storage
|
protected abstract int |
getNumberOfMessagesIn(ConcurrentMap<I,T> partitionMap)
Get number of messages in partition map
|
protected ConcurrentMap<I,T> |
getOrCreatePartitionMap(int partitionId)
If there is already a map of messages related to the partition id
return that map, otherwise create a new one, put it in global map and
return it.
|
Iterable<I> |
getPartitionDestinationVertices(int partitionId)
Gets vertex ids from selected partition which we have messages for
|
protected int |
getPartitionId(I vertexId)
Get id of partition which holds vertex with selected id
|
Iterable<M> |
getVertexMessages(I vertexId)
Gets messages for a vertex.
|
boolean |
hasMessagesForPartition(int partitionId)
Check if we have messages for some partition
|
boolean |
hasMessagesForVertex(I vertexId)
Check if we have messages for some vertex
|
protected abstract T |
readFieldsForMessages(DataInput in)
Read message storage from
DataInput |
void |
readFieldsForPartition(DataInput in,
int partitionId)
Deserialize messages for one partition
|
protected abstract void |
writeMessages(T messages,
DataOutput out)
Write message storage to
DataOutput |
void |
writePartition(DataOutput out,
int partitionId)
Serialize messages for one partition.
|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
addMessage, addPartitionMessages, isPointerListEncoding
protected final MessageValueFactory<M extends org.apache.hadoop.io.Writable> messageValueFactory
protected final PartitionSplitInfo<I extends org.apache.hadoop.io.WritableComparable> partitionInfo
protected final ConcurrentMap<Integer,ConcurrentMap<I extends org.apache.hadoop.io.WritableComparable,T>> map
protected final ImmutableClassesGiraphConfiguration<I extends org.apache.hadoop.io.WritableComparable,?,?> config
public SimpleMessageStore(MessageValueFactory<M> messageValueFactory, PartitionSplitInfo<I> partitionInfo, ImmutableClassesGiraphConfiguration<I,?,?> config)
messageValueFactory
- Message class held in the storepartitionInfo
- Partition split infoconfig
- Giraph configurationprotected abstract Iterable<M> getMessagesAsIterable(T messages)
messages
- Message storageprotected abstract int getNumberOfMessagesIn(ConcurrentMap<I,T> partitionMap)
partitionMap
- Partition map in which to count messagesprotected abstract void writeMessages(T messages, DataOutput out) throws IOException
DataOutput
messages
- Message storageout
- Data outputIOException
protected abstract T readFieldsForMessages(DataInput in) throws IOException
DataInput
in
- Data inputIOException
protected int getPartitionId(I vertexId)
vertexId
- Id of vertexprotected ConcurrentMap<I,T> getOrCreatePartitionMap(int partitionId)
partitionId
- Id of partitionpublic void finalizeStore()
MessageStore
finalizeStore
in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>
public Iterable<I> getPartitionDestinationVertices(int partitionId)
MessageStore
getPartitionDestinationVertices
in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>
partitionId
- Id of partitionpublic boolean hasMessagesForVertex(I vertexId)
MessageStore
hasMessagesForVertex
in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>
vertexId
- Id of vertex which we want to checkpublic Iterable<M> getVertexMessages(I vertexId)
MessageStore
getVertexMessages
in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>
vertexId
- Vertex id for which we want to get messagespublic void writePartition(DataOutput out, int partitionId) throws IOException
MessageStore
writePartition
in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>
out
- DataOutput
to serialize this object intopartitionId
- Id of partitionIOException
public void readFieldsForPartition(DataInput in, int partitionId) throws IOException
MessageStore
readFieldsForPartition
in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>
in
- DataInput
to deserialize this object
from.partitionId
- Id of partitionIOException
public void clearVertexMessages(I vertexId)
MessageStore
clearVertexMessages
in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>
vertexId
- Vertex id for which we want to clear messagespublic void clearPartition(int partitionId)
MessageStore
clearPartition
in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>
partitionId
- Partition id for which we want to clear messagespublic boolean hasMessagesForPartition(int partitionId)
MessageStore
hasMessagesForPartition
in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>
partitionId
- Id of partition which we want to checkpublic void clearAll()
MessageStore
clearAll
in interface MessageStore<I extends org.apache.hadoop.io.WritableComparable,M extends org.apache.hadoop.io.Writable>
Copyright © 2011-2020 The Apache Software Foundation. All Rights Reserved.