View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc.data;
20  
21  import org.apache.giraph.comm.messages.MessageStore;
22  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23  import org.apache.giraph.factories.MessageValueFactory;
24  import org.apache.giraph.ooc.OutOfCoreEngine;
25  import org.apache.giraph.ooc.persistence.DataIndex;
26  import org.apache.giraph.ooc.persistence.DataIndex.NumericIndexEntry;
27  import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
28  import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
29  import org.apache.giraph.utils.ByteArrayVertexIdMessages;
30  import org.apache.giraph.utils.VertexIdMessages;
31  import org.apache.hadoop.io.Writable;
32  import org.apache.hadoop.io.WritableComparable;
33  import org.apache.log4j.Logger;
34  
35  import java.io.DataInput;
36  import java.io.DataOutput;
37  import java.io.IOException;
38  
39  /**
40   * Implementation of a message store used for out-of-core mechanism.
41   *
42   * @param <I> Vertex id
43   * @param <M> Message data
44   */
45  public class DiskBackedMessageStore<I extends WritableComparable,
46      M extends Writable> extends DiskBackedDataStore<VertexIdMessages<I, M>>
47      implements MessageStore<I, M> {
48    /** Class logger. */
49    private static final Logger LOG =
50        Logger.getLogger(DiskBackedMessageStore.class);
51    /** Configuration */
52    private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
53    /** In-memory message store */
54    private final MessageStore<I, M> messageStore;
55    /** Whether the message store uses message combiner or not */
56    private final boolean useMessageCombiner;
57    /** Which superstep this message store is used for */
58    private final long superstep;
59    /** Message value class */
60    private final MessageValueFactory<M> messageValueFactory;
61  
62    /**
63     * Type of VertexIdMessage class (container for serialized messages) received
64     * for a particular message. If we write the received messages to disk before
65     * adding them to message store, we need this type when we want to read the
66     * messages back from disk (so that we know how to read the messages from
67     * disk).
68     */
69    private enum SerializedMessageClass {
70      /** ByteArrayVertexIdMessages */
71      BYTE_ARRAY_VERTEX_ID_MESSAGES,
72      /** ByteArrayOneMEssageToManyIds */
73      BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS
74    }
75  
76    /**
77     * Constructor
78     *
79     * @param config Configuration
80     * @param oocEngine Out-of-core engine
81     * @param messageStore In-memory message store for which out-of-core message
82     *                     store would be wrapper
83     * @param useMessageCombiner Whether message combiner is used for this message
84     *                           store
85     * @param superstep superstep number this messages store is used for
86     */
87    public DiskBackedMessageStore(ImmutableClassesGiraphConfiguration<I, ?, ?>
88                                      config,
89                                  OutOfCoreEngine oocEngine,
90                                  MessageStore<I, M> messageStore,
91                                  boolean useMessageCombiner, long superstep) {
92      super(config, oocEngine);
93      this.config = config;
94      this.messageStore = messageStore;
95      this.useMessageCombiner = useMessageCombiner;
96      this.superstep = superstep;
97      this.messageValueFactory = config.createOutgoingMessageValueFactory();
98    }
99  
100   @Override
101   public boolean isPointerListEncoding() {
102     return messageStore.isPointerListEncoding();
103   }
104 
105   @Override
106   public Iterable<M> getVertexMessages(I vertexId) {
107     return messageStore.getVertexMessages(vertexId);
108   }
109 
110   @Override
111   public void clearVertexMessages(I vertexId) {
112     messageStore.clearVertexMessages(vertexId);
113   }
114 
115   @Override
116   public void clearAll() {
117     messageStore.clearAll();
118   }
119 
120   @Override
121   public boolean hasMessagesForVertex(I vertexId) {
122     return messageStore.hasMessagesForVertex(vertexId);
123   }
124 
125   @Override
126   public boolean hasMessagesForPartition(int partitionId) {
127     return messageStore.hasMessagesForPartition(partitionId);
128   }
129 
130   @Override
131   public void addPartitionMessages(
132       int partitionId, VertexIdMessages<I, M> messages) {
133     if (useMessageCombiner) {
134       messageStore.addPartitionMessages(partitionId, messages);
135     } else {
136       addEntry(partitionId, messages);
137     }
138   }
139 
140 
141   @Override
142   public long loadPartitionData(int partitionId)
143       throws IOException {
144     if (!useMessageCombiner) {
145       return loadPartitionDataProxy(partitionId,
146           new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE)
147               .addIndex(NumericIndexEntry.createSuperstepEntry(superstep)));
148     } else {
149       return 0;
150     }
151   }
152 
153   @Override
154   public long offloadPartitionData(int partitionId)
155       throws IOException {
156     if (!useMessageCombiner) {
157       return offloadPartitionDataProxy(partitionId,
158           new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE)
159               .addIndex(NumericIndexEntry.createSuperstepEntry(superstep)));
160     } else {
161       return 0;
162     }
163   }
164 
165   @Override
166   public long offloadBuffers(int partitionId)
167       throws IOException {
168     if (!useMessageCombiner) {
169       return offloadBuffersProxy(partitionId,
170           new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE)
171               .addIndex(NumericIndexEntry.createSuperstepEntry(superstep)));
172     } else {
173       return 0;
174     }
175   }
176 
177   @Override
178   public void finalizeStore() {
179     messageStore.finalizeStore();
180   }
181 
182   @Override
183   public Iterable<I> getPartitionDestinationVertices(int partitionId) {
184     return messageStore.getPartitionDestinationVertices(partitionId);
185   }
186 
187   @Override
188   public void clearPartition(int partitionId) {
189     messageStore.clearPartition(partitionId);
190   }
191 
192   @Override
193   public void writePartition(DataOutput out, int partitionId)
194       throws IOException {
195     messageStore.writePartition(out, partitionId);
196   }
197 
198   @Override
199   public void readFieldsForPartition(DataInput in, int partitionId)
200       throws IOException {
201     messageStore.readFieldsForPartition(in, partitionId);
202   }
203 
204   @Override
205   protected void writeEntry(VertexIdMessages<I, M> messages, DataOutput out)
206       throws IOException {
207     SerializedMessageClass messageClass;
208     if (messages instanceof ByteArrayVertexIdMessages) {
209       messageClass = SerializedMessageClass.BYTE_ARRAY_VERTEX_ID_MESSAGES;
210     } else if (messages instanceof ByteArrayOneMessageToManyIds) {
211       messageClass = SerializedMessageClass.BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS;
212     } else {
213       throw new IllegalStateException("writeEntry: serialized message " +
214           "type is not supported");
215     }
216     out.writeByte(messageClass.ordinal());
217     messages.write(out);
218   }
219 
220   @Override
221   protected VertexIdMessages<I, M> readNextEntry(DataInput in)
222       throws IOException {
223     byte messageType = in.readByte();
224     SerializedMessageClass messageClass =
225         SerializedMessageClass.values()[messageType];
226     VertexIdMessages<I, M> vim;
227     switch (messageClass) {
228     case BYTE_ARRAY_VERTEX_ID_MESSAGES:
229       vim = new ByteArrayVertexIdMessages<>(messageValueFactory);
230       vim.setConf(config);
231       break;
232     case BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS:
233       vim = new ByteArrayOneMessageToManyIds<>(messageValueFactory);
234       vim.setConf(config);
235       break;
236     default:
237       throw new IllegalStateException("readNextEntry: unsupported " +
238           "serialized message type!");
239     }
240     vim.readFields(in);
241     return vim;
242   }
243 
244   @Override
245   protected long loadInMemoryPartitionData(int partitionId, int ioThreadId,
246                                            DataIndex index) throws IOException {
247     long numBytes = 0;
248     if (hasPartitionDataOnFile.remove(partitionId)) {
249       OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
250           oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy());
251       messageStore.readFieldsForPartition(inputWrapper.getDataInput(),
252           partitionId);
253       numBytes = inputWrapper.finalizeInput(true);
254     }
255     return numBytes;
256   }
257 
258   @Override
259   protected long offloadInMemoryPartitionData(
260       int partitionId, int ioThreadId, DataIndex index) throws IOException {
261     long numBytes = 0;
262     if (messageStore.hasMessagesForPartition(partitionId)) {
263       OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
264           oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(),
265               false);
266       messageStore.writePartition(outputWrapper.getDataOutput(), partitionId);
267       messageStore.clearPartition(partitionId);
268       numBytes = outputWrapper.finalizeOutput();
269       hasPartitionDataOnFile.add(partitionId);
270     }
271     return numBytes;
272   }
273 
274   @Override
275   protected int entrySerializedSize(VertexIdMessages<I, M> messages) {
276     return messages.getSerializedSize();
277   }
278 
279   @Override
280   protected void addEntryToInMemoryPartitionData(int partitionId,
281                                                  VertexIdMessages<I, M>
282                                                      messages) {
283     messageStore.addPartitionMessages(partitionId, messages);
284   }
285 }