This project has retired. For details please refer to its
        
        Attic page.
      
 
DiskBackedMessageStore xref
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.giraph.ooc.data;
20  
21  import java.io.DataInput;
22  import java.io.DataOutput;
23  import java.io.IOException;
24  
25  import org.apache.giraph.comm.messages.MessageStore;
26  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
27  import org.apache.giraph.factories.MessageValueFactory;
28  import org.apache.giraph.ooc.OutOfCoreEngine;
29  import org.apache.giraph.ooc.persistence.DataIndex;
30  import org.apache.giraph.ooc.persistence.DataIndex.NumericIndexEntry;
31  import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
32  import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
33  import org.apache.giraph.utils.ByteArrayVertexIdMessages;
34  import org.apache.giraph.utils.VertexIdMessages;
35  import org.apache.hadoop.io.Writable;
36  import org.apache.hadoop.io.WritableComparable;
37  import org.apache.log4j.Logger;
38  
39  
40  
41  
42  
43  
44  
45  
46  public class DiskBackedMessageStore<I extends WritableComparable,
47      M extends Writable> extends DiskBackedDataStore<VertexIdMessages<I, M>>
48      implements MessageStore<I, M> {
49    
50    private static final Logger LOG =
51        Logger.getLogger(DiskBackedMessageStore.class);
52    
53    private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
54    
55    private final MessageStore<I, M> messageStore;
56    
57    private final boolean useMessageCombiner;
58    
59    private final long superstep;
60    
61    private final MessageValueFactory<M> messageValueFactory;
62  
63    
64  
65  
66  
67  
68  
69  
70    private enum SerializedMessageClass {
71      
72      BYTE_ARRAY_VERTEX_ID_MESSAGES,
73      
74      BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS
75    }
76  
77    
78  
79  
80  
81  
82  
83  
84  
85  
86  
87  
88    public DiskBackedMessageStore(ImmutableClassesGiraphConfiguration<I, ?, ?>
89                                      config,
90                                  OutOfCoreEngine oocEngine,
91                                  MessageStore<I, M> messageStore,
92                                  boolean useMessageCombiner, long superstep) {
93      super(config, oocEngine);
94      this.config = config;
95      this.messageStore = messageStore;
96      this.useMessageCombiner = useMessageCombiner;
97      this.superstep = superstep;
98      this.messageValueFactory = config.createOutgoingMessageValueFactory();
99    }
100 
101   @Override
102   public boolean isPointerListEncoding() {
103     return messageStore.isPointerListEncoding();
104   }
105 
106   @Override
107   public Iterable<M> getVertexMessages(I vertexId) {
108     return messageStore.getVertexMessages(vertexId);
109   }
110 
111   @Override
112   public void clearVertexMessages(I vertexId) {
113     messageStore.clearVertexMessages(vertexId);
114   }
115 
116   @Override
117   public void clearAll() {
118     messageStore.clearAll();
119   }
120 
121   @Override
122   public boolean hasMessagesForVertex(I vertexId) {
123     return messageStore.hasMessagesForVertex(vertexId);
124   }
125 
126   @Override
127   public boolean hasMessagesForPartition(int partitionId) {
128     return messageStore.hasMessagesForPartition(partitionId);
129   }
130 
131   @Override
132   public void addPartitionMessages(
133       int partitionId, VertexIdMessages<I, M> messages) {
134     if (useMessageCombiner) {
135       messageStore.addPartitionMessages(partitionId, messages);
136     } else {
137       addEntry(partitionId, messages);
138     }
139   }
140 
141   @Override
142   public void addMessage(I vertexId, M message) throws IOException {
143     if (useMessageCombiner) {
144       messageStore.addMessage(vertexId, message);
145     } else {
146       
147       throw new UnsupportedOperationException();
148     }
149   }
150 
151   
152 
153 
154 
155 
156 
157 
158   private static String getPath(String basePath, long superstep) {
159     return basePath + "_messages-S" + superstep;
160   }
161 
162   @Override
163   public long loadPartitionData(int partitionId)
164       throws IOException {
165     if (!useMessageCombiner) {
166       return loadPartitionDataProxy(partitionId,
167           new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE)
168               .addIndex(NumericIndexEntry.createSuperstepEntry(superstep)));
169     } else {
170       return 0;
171     }
172   }
173 
174   @Override
175   public long offloadPartitionData(int partitionId)
176       throws IOException {
177     if (!useMessageCombiner) {
178       return offloadPartitionDataProxy(partitionId,
179           new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE)
180               .addIndex(NumericIndexEntry.createSuperstepEntry(superstep)));
181     } else {
182       return 0;
183     }
184   }
185 
186   @Override
187   public long offloadBuffers(int partitionId)
188       throws IOException {
189     if (!useMessageCombiner) {
190       return offloadBuffersProxy(partitionId,
191           new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE)
192               .addIndex(NumericIndexEntry.createSuperstepEntry(superstep)));
193     } else {
194       return 0;
195     }
196   }
197 
198   @Override
199   public void finalizeStore() {
200     messageStore.finalizeStore();
201   }
202 
203   @Override
204   public Iterable<I> getPartitionDestinationVertices(int partitionId) {
205     return messageStore.getPartitionDestinationVertices(partitionId);
206   }
207 
208   @Override
209   public void clearPartition(int partitionId) {
210     messageStore.clearPartition(partitionId);
211   }
212 
213   @Override
214   public void writePartition(DataOutput out, int partitionId)
215       throws IOException {
216     messageStore.writePartition(out, partitionId);
217   }
218 
219   @Override
220   public void readFieldsForPartition(DataInput in, int partitionId)
221       throws IOException {
222     messageStore.readFieldsForPartition(in, partitionId);
223   }
224 
225   @Override
226   protected void writeEntry(VertexIdMessages<I, M> messages, DataOutput out)
227       throws IOException {
228     SerializedMessageClass messageClass;
229     if (messages instanceof ByteArrayVertexIdMessages) {
230       messageClass = SerializedMessageClass.BYTE_ARRAY_VERTEX_ID_MESSAGES;
231     } else if (messages instanceof ByteArrayOneMessageToManyIds) {
232       messageClass = SerializedMessageClass.BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS;
233     } else {
234       throw new IllegalStateException("writeEntry: serialized message " +
235           "type is not supported");
236     }
237     out.writeByte(messageClass.ordinal());
238     messages.write(out);
239   }
240 
241   @Override
242   protected VertexIdMessages<I, M> readNextEntry(DataInput in)
243       throws IOException {
244     byte messageType = in.readByte();
245     SerializedMessageClass messageClass =
246         SerializedMessageClass.values()[messageType];
247     VertexIdMessages<I, M> vim;
248     switch (messageClass) {
249     case BYTE_ARRAY_VERTEX_ID_MESSAGES:
250       vim = new ByteArrayVertexIdMessages<>(messageValueFactory);
251       vim.setConf(config);
252       break;
253     case BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS:
254       vim = new ByteArrayOneMessageToManyIds<>(messageValueFactory);
255       vim.setConf(config);
256       break;
257     default:
258       throw new IllegalStateException("readNextEntry: unsupported " +
259           "serialized message type!");
260     }
261     vim.readFields(in);
262     return vim;
263   }
264 
265   @Override
266   protected long loadInMemoryPartitionData(int partitionId, int ioThreadId,
267                                            DataIndex index) throws IOException {
268     long numBytes = 0;
269     if (hasPartitionDataOnFile.remove(partitionId)) {
270       OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
271           oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy());
272       messageStore.readFieldsForPartition(inputWrapper.getDataInput(),
273           partitionId);
274       numBytes = inputWrapper.finalizeInput(true);
275     }
276     return numBytes;
277   }
278 
279   @Override
280   protected long offloadInMemoryPartitionData(
281       int partitionId, int ioThreadId, DataIndex index) throws IOException {
282     long numBytes = 0;
283     if (messageStore.hasMessagesForPartition(partitionId)) {
284       OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
285           oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(),
286               false);
287       messageStore.writePartition(outputWrapper.getDataOutput(), partitionId);
288       messageStore.clearPartition(partitionId);
289       numBytes = outputWrapper.finalizeOutput();
290       hasPartitionDataOnFile.add(partitionId);
291     }
292     return numBytes;
293   }
294 
295   @Override
296   protected int entrySerializedSize(VertexIdMessages<I, M> messages) {
297     return messages.getSerializedSize();
298   }
299 
300   @Override
301   protected void addEntryToInMemoryPartitionData(int partitionId,
302                                                  VertexIdMessages<I, M>
303                                                      messages) {
304     messageStore.addPartitionMessages(partitionId, messages);
305   }
306 }