1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.giraph.comm.messages;
20
21 import java.io.DataInput;
22 import java.io.DataOutput;
23 import java.io.IOException;
24
25 import org.apache.giraph.utils.VertexIdMessages;
26 import org.apache.hadoop.io.Writable;
27 import org.apache.hadoop.io.WritableComparable;
28
29 /**
30 * Message store
31 *
32 * @param <I> Vertex id
33 * @param <M> Message data
34 */
35 public interface MessageStore<I extends WritableComparable,
36 M extends Writable> {
37 /**
38 * True if this message-store encodes messages as a list of long pointers
39 * to compact serialized messages
40 *
41 * @return true if we encode messages as a list of pointers
42 */
43 boolean isPointerListEncoding();
44
45 /**
46 * Gets messages for a vertex. The lifetime of every message is only
47 * guaranteed until the iterator's next() method is called. Do not hold
48 * references to objects returned by this iterator.
49 *
50 * @param vertexId Vertex id for which we want to get messages
51 * @return Iterable of messages for a vertex id
52 */
53 Iterable<M> getVertexMessages(I vertexId);
54
55 /**
56 * Clears messages for a vertex.
57 *
58 * @param vertexId Vertex id for which we want to clear messages
59 */
60 void clearVertexMessages(I vertexId);
61
62 /**
63 * Clears all resources used by this store.
64 */
65 void clearAll();
66
67 /**
68 * Check if we have messages for some vertex
69 *
70 * @param vertexId Id of vertex which we want to check
71 * @return True iff we have messages for vertex with required id
72 */
73 boolean hasMessagesForVertex(I vertexId);
74
75 /**
76 * Check if we have messages for some partition
77 *
78 * @param partitionId Id of partition which we want to check
79 * @return True iff we have messages for the given partition
80 */
81 boolean hasMessagesForPartition(int partitionId);
82
83 /**
84 * Adds messages for partition
85 *
86 * @param partitionId Id of partition
87 * @param messages Collection of vertex ids and messages we want to add
88 */
89 void addPartitionMessages(
90 int partitionId, VertexIdMessages<I, M> messages);
91
92 /**
93 * Adds a message for a particular vertex
94 * The method is used by InternalMessageStore to send local messages; for
95 * the general case, use a more efficient addPartitionMessages
96 *
97 * @param vertexId Id of target vertex
98 * @param message A message to send
99 * @throws IOException
100 */
101 void addMessage(I vertexId, M message) throws IOException;
102
103 /**
104 * Called before start of computation in bspworker
105 * Since it is run from a single thread while the store is not being
106 * accessed by any other thread - this is ensured to be thread-safe
107 */
108 void finalizeStore();
109
110 /**
111 * Gets vertex ids from selected partition which we have messages for
112 *
113 * @param partitionId Id of partition
114 * @return Iterable over vertex ids which we have messages for
115 */
116 Iterable<I> getPartitionDestinationVertices(int partitionId);
117
118 /**
119 * Clears messages for a partition.
120 *
121 * @param partitionId Partition id for which we want to clear messages
122 */
123 void clearPartition(int partitionId);
124
125 /**
126 * Serialize messages for one partition.
127 *
128 * @param out {@link DataOutput} to serialize this object into
129 * @param partitionId Id of partition
130 * @throws IOException
131 */
132 void writePartition(DataOutput out, int partitionId) throws IOException;
133
134 /**
135 * Deserialize messages for one partition
136 *
137 * @param in {@link DataInput} to deserialize this object
138 * from.
139 * @param partitionId Id of partition
140 * @throws IOException
141 */
142 void readFieldsForPartition(DataInput in,
143 int partitionId) throws IOException;
144 }