This project has retired. For details please refer to its Attic page.
DiskBackedMessageStore xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc.data;
20  
21  import java.io.DataInput;
22  import java.io.DataOutput;
23  import java.io.IOException;
24  
25  import org.apache.giraph.comm.messages.MessageStore;
26  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
27  import org.apache.giraph.factories.MessageValueFactory;
28  import org.apache.giraph.ooc.OutOfCoreEngine;
29  import org.apache.giraph.ooc.persistence.DataIndex;
30  import org.apache.giraph.ooc.persistence.DataIndex.NumericIndexEntry;
31  import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
32  import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
33  import org.apache.giraph.utils.ByteArrayVertexIdMessages;
34  import org.apache.giraph.utils.VertexIdMessages;
35  import org.apache.hadoop.io.Writable;
36  import org.apache.hadoop.io.WritableComparable;
37  import org.apache.log4j.Logger;
38  
39  
40  /**
41   * Implementation of a message store used for out-of-core mechanism.
42   *
43   * @param <I> Vertex id
44   * @param <M> Message data
45   */
46  public class DiskBackedMessageStore<I extends WritableComparable,
47      M extends Writable> extends DiskBackedDataStore<VertexIdMessages<I, M>>
48      implements MessageStore<I, M> {
49    /** Class logger. */
50    private static final Logger LOG =
51        Logger.getLogger(DiskBackedMessageStore.class);
52    /** Configuration */
53    private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
54    /** In-memory message store */
55    private final MessageStore<I, M> messageStore;
56    /** Whether the message store uses message combiner or not */
57    private final boolean useMessageCombiner;
58    /** Which superstep this message store is used for */
59    private final long superstep;
60    /** Message value class */
61    private final MessageValueFactory<M> messageValueFactory;
62  
63    /**
64     * Type of VertexIdMessage class (container for serialized messages) received
65     * for a particular message. If we write the received messages to disk before
66     * adding them to message store, we need this type when we want to read the
67     * messages back from disk (so that we know how to read the messages from
68     * disk).
69     */
70    private enum SerializedMessageClass {
71      /** ByteArrayVertexIdMessages */
72      BYTE_ARRAY_VERTEX_ID_MESSAGES,
73      /** ByteArrayOneMEssageToManyIds */
74      BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS
75    }
76  
77    /**
78     * Constructor
79     *
80     * @param config Configuration
81     * @param oocEngine Out-of-core engine
82     * @param messageStore In-memory message store for which out-of-core message
83     *                     store would be wrapper
84     * @param useMessageCombiner Whether message combiner is used for this message
85     *                           store
86     * @param superstep superstep number this messages store is used for
87     */
88    public DiskBackedMessageStore(ImmutableClassesGiraphConfiguration<I, ?, ?>
89                                      config,
90                                  OutOfCoreEngine oocEngine,
91                                  MessageStore<I, M> messageStore,
92                                  boolean useMessageCombiner, long superstep) {
93      super(config, oocEngine);
94      this.config = config;
95      this.messageStore = messageStore;
96      this.useMessageCombiner = useMessageCombiner;
97      this.superstep = superstep;
98      this.messageValueFactory = config.createOutgoingMessageValueFactory();
99    }
100 
101   @Override
102   public boolean isPointerListEncoding() {
103     return messageStore.isPointerListEncoding();
104   }
105 
106   @Override
107   public Iterable<M> getVertexMessages(I vertexId) {
108     return messageStore.getVertexMessages(vertexId);
109   }
110 
111   @Override
112   public void clearVertexMessages(I vertexId) {
113     messageStore.clearVertexMessages(vertexId);
114   }
115 
116   @Override
117   public void clearAll() {
118     messageStore.clearAll();
119   }
120 
121   @Override
122   public boolean hasMessagesForVertex(I vertexId) {
123     return messageStore.hasMessagesForVertex(vertexId);
124   }
125 
126   @Override
127   public boolean hasMessagesForPartition(int partitionId) {
128     return messageStore.hasMessagesForPartition(partitionId);
129   }
130 
131   @Override
132   public void addPartitionMessages(
133       int partitionId, VertexIdMessages<I, M> messages) {
134     if (useMessageCombiner) {
135       messageStore.addPartitionMessages(partitionId, messages);
136     } else {
137       addEntry(partitionId, messages);
138     }
139   }
140 
141   @Override
142   public void addMessage(I vertexId, M message) throws IOException {
143     if (useMessageCombiner) {
144       messageStore.addMessage(vertexId, message);
145     } else {
146       // TODO: implement if LocalBlockRunner needs this message store
147       throw new UnsupportedOperationException();
148     }
149   }
150 
151   /**
152    * Gets the path that should be used specifically for message data.
153    *
154    * @param basePath path prefix to build the actual path from
155    * @param superstep superstep for which message data should be stored
156    * @return path to files specific for message data
157    */
158   private static String getPath(String basePath, long superstep) {
159     return basePath + "_messages-S" + superstep;
160   }
161 
162   @Override
163   public long loadPartitionData(int partitionId)
164       throws IOException {
165     if (!useMessageCombiner) {
166       return loadPartitionDataProxy(partitionId,
167           new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE)
168               .addIndex(NumericIndexEntry.createSuperstepEntry(superstep)));
169     } else {
170       return 0;
171     }
172   }
173 
174   @Override
175   public long offloadPartitionData(int partitionId)
176       throws IOException {
177     if (!useMessageCombiner) {
178       return offloadPartitionDataProxy(partitionId,
179           new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE)
180               .addIndex(NumericIndexEntry.createSuperstepEntry(superstep)));
181     } else {
182       return 0;
183     }
184   }
185 
186   @Override
187   public long offloadBuffers(int partitionId)
188       throws IOException {
189     if (!useMessageCombiner) {
190       return offloadBuffersProxy(partitionId,
191           new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE)
192               .addIndex(NumericIndexEntry.createSuperstepEntry(superstep)));
193     } else {
194       return 0;
195     }
196   }
197 
198   @Override
199   public void finalizeStore() {
200     messageStore.finalizeStore();
201   }
202 
203   @Override
204   public Iterable<I> getPartitionDestinationVertices(int partitionId) {
205     return messageStore.getPartitionDestinationVertices(partitionId);
206   }
207 
208   @Override
209   public void clearPartition(int partitionId) {
210     messageStore.clearPartition(partitionId);
211   }
212 
213   @Override
214   public void writePartition(DataOutput out, int partitionId)
215       throws IOException {
216     messageStore.writePartition(out, partitionId);
217   }
218 
219   @Override
220   public void readFieldsForPartition(DataInput in, int partitionId)
221       throws IOException {
222     messageStore.readFieldsForPartition(in, partitionId);
223   }
224 
225   @Override
226   protected void writeEntry(VertexIdMessages<I, M> messages, DataOutput out)
227       throws IOException {
228     SerializedMessageClass messageClass;
229     if (messages instanceof ByteArrayVertexIdMessages) {
230       messageClass = SerializedMessageClass.BYTE_ARRAY_VERTEX_ID_MESSAGES;
231     } else if (messages instanceof ByteArrayOneMessageToManyIds) {
232       messageClass = SerializedMessageClass.BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS;
233     } else {
234       throw new IllegalStateException("writeEntry: serialized message " +
235           "type is not supported");
236     }
237     out.writeByte(messageClass.ordinal());
238     messages.write(out);
239   }
240 
241   @Override
242   protected VertexIdMessages<I, M> readNextEntry(DataInput in)
243       throws IOException {
244     byte messageType = in.readByte();
245     SerializedMessageClass messageClass =
246         SerializedMessageClass.values()[messageType];
247     VertexIdMessages<I, M> vim;
248     switch (messageClass) {
249     case BYTE_ARRAY_VERTEX_ID_MESSAGES:
250       vim = new ByteArrayVertexIdMessages<>(messageValueFactory);
251       vim.setConf(config);
252       break;
253     case BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS:
254       vim = new ByteArrayOneMessageToManyIds<>(messageValueFactory);
255       vim.setConf(config);
256       break;
257     default:
258       throw new IllegalStateException("readNextEntry: unsupported " +
259           "serialized message type!");
260     }
261     vim.readFields(in);
262     return vim;
263   }
264 
265   @Override
266   protected long loadInMemoryPartitionData(int partitionId, int ioThreadId,
267                                            DataIndex index) throws IOException {
268     long numBytes = 0;
269     if (hasPartitionDataOnFile.remove(partitionId)) {
270       OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
271           oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy());
272       messageStore.readFieldsForPartition(inputWrapper.getDataInput(),
273           partitionId);
274       numBytes = inputWrapper.finalizeInput(true);
275     }
276     return numBytes;
277   }
278 
279   @Override
280   protected long offloadInMemoryPartitionData(
281       int partitionId, int ioThreadId, DataIndex index) throws IOException {
282     long numBytes = 0;
283     if (messageStore.hasMessagesForPartition(partitionId)) {
284       OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
285           oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(),
286               false);
287       messageStore.writePartition(outputWrapper.getDataOutput(), partitionId);
288       messageStore.clearPartition(partitionId);
289       numBytes = outputWrapper.finalizeOutput();
290       hasPartitionDataOnFile.add(partitionId);
291     }
292     return numBytes;
293   }
294 
295   @Override
296   protected int entrySerializedSize(VertexIdMessages<I, M> messages) {
297     return messages.getSerializedSize();
298   }
299 
300   @Override
301   protected void addEntryToInMemoryPartitionData(int partitionId,
302                                                  VertexIdMessages<I, M>
303                                                      messages) {
304     messageStore.addPartitionMessages(partitionId, messages);
305   }
306 }