This project has retired. For details please refer to its Attic page.
DiskBackedPartitionStore xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc.data;
20  
21  import com.google.common.collect.Maps;
22  import org.apache.giraph.bsp.BspService;
23  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24  import org.apache.giraph.edge.OutEdges;
25  import org.apache.giraph.graph.Vertex;
26  import org.apache.giraph.ooc.OutOfCoreEngine;
27  import org.apache.giraph.ooc.persistence.DataIndex;
28  import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
29  import org.apache.giraph.partition.Partition;
30  import org.apache.giraph.partition.PartitionStore;
31  import org.apache.giraph.utils.ExtendedDataOutput;
32  import org.apache.giraph.utils.WritableUtils;
33  import org.apache.giraph.worker.BspServiceWorker;
34  import org.apache.hadoop.io.Writable;
35  import org.apache.hadoop.io.WritableComparable;
36  import org.apache.hadoop.mapreduce.Mapper;
37  import org.apache.log4j.Logger;
38  
39  import java.io.DataInput;
40  import java.io.DataOutput;
41  import java.io.IOException;
42  import java.util.Map;
43  
44  import static com.google.common.base.Preconditions.checkNotNull;
45  
46  /**
47   * Implementation of a partition-store used for out-of-core mechanism.
48   * Partition store is responsible for partition data, as well as data buffers in
49   * INPUT_SUPERSTEP ("raw data buffer" -- defined in DiskBackedDataStore --
50   * refers to vertex buffers in INPUT_SUPERSTEP).
51   *
52   * @param <I> Vertex id
53   * @param <V> Vertex data
54   * @param <E> Edge data
55   */
56  public class DiskBackedPartitionStore<I extends WritableComparable,
57      V extends Writable, E extends Writable>
58      extends DiskBackedDataStore<ExtendedDataOutput>
59      implements PartitionStore<I, V, E> {
60    /** Class logger. */
61    private static final Logger LOG =
62        Logger.getLogger(DiskBackedPartitionStore.class);
63    /** Configuration */
64    private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
65    /** Job context (for progress) */
66    private final Mapper<?, ?, ?, ?>.Context context;
67    /** In-memory partition store */
68    private final PartitionStore<I, V, E> partitionStore;
69    /**
70     * Keeps number of vertices in partitions, right when they are last spilled
71     * to the disk. This value may be inaccurate for in-memory partitions, but
72     * is accurate for out-of-core partitions.
73     */
74    private final Map<Integer, Long> partitionVertexCount =
75        Maps.newConcurrentMap();
76    /**
77     * Keeps number of edges in partitions, right when they are last spilled
78     * to the disk. This value may be inaccurate for in-memory partitions, but
79     * is accurate for out-of-core partitions.
80     */
81    private final Map<Integer, Long> partitionEdgeCount =
82        Maps.newConcurrentMap();
83  
84    /**
85     * Constructor.
86     *
87     * @param partitionStore In-memory partition store for which out-of-code
88     *                       partition store would be a wrapper
89     * @param conf Configuration
90     * @param context Job context
91     * @param oocEngine Out-of-core engine
92     */
93    public DiskBackedPartitionStore(
94        PartitionStore<I, V, E> partitionStore,
95        ImmutableClassesGiraphConfiguration<I, V, E> conf,
96        Mapper<?, ?, ?, ?>.Context context,
97        OutOfCoreEngine oocEngine) {
98      super(conf, oocEngine);
99      this.partitionStore = partitionStore;
100     this.conf = conf;
101     this.context = context;
102   }
103 
104   @Override
105   public boolean addPartition(Partition<I, V, E> partition) {
106     boolean added = partitionStore.addPartition(partition);
107     if (added) {
108       oocEngine.getMetaPartitionManager()
109           .addPartition(partition.getId());
110     }
111     return added;
112   }
113 
114   @Override
115   public Partition<I, V, E> removePartition(Integer partitionId) {
116     // Set the partition as 'in process' so its data and messages do not get
117     // spilled to disk until the remove is complete.
118     oocEngine.getMetaPartitionManager().markPartitionAsInProcess(partitionId);
119     oocEngine.retrievePartition(partitionId);
120     Partition<I, V, E> partition = partitionStore.removePartition(partitionId);
121     checkNotNull(partition, "removePartition: partition " + partitionId +
122         " is not in memory for removal!");
123     oocEngine.getMetaPartitionManager().removePartition(partitionId);
124     return partition;
125   }
126 
127   @Override
128   public boolean hasPartition(Integer partitionId) {
129     return oocEngine.getMetaPartitionManager().hasPartition(partitionId);
130   }
131 
132   @Override
133   public Iterable<Integer> getPartitionIds() {
134     return oocEngine.getMetaPartitionManager().getPartitionIds();
135   }
136 
137   @Override
138   public int getNumPartitions() {
139     return oocEngine.getMetaPartitionManager().getNumPartitions();
140   }
141 
142   @Override
143   public long getPartitionVertexCount(Integer partitionId) {
144     if (partitionStore.hasPartition(partitionId)) {
145       return partitionStore.getPartitionVertexCount(partitionId);
146     } else {
147       return partitionVertexCount.get(partitionId);
148     }
149   }
150 
151   @Override
152   public long getPartitionEdgeCount(Integer partitionId) {
153     if (partitionStore.hasPartition(partitionId)) {
154       return partitionStore.getPartitionEdgeCount(partitionId);
155     } else {
156       return partitionEdgeCount.get(partitionId);
157     }
158   }
159 
160   @Override
161   public boolean isEmpty() {
162     return getNumPartitions() == 0;
163   }
164 
165   @Override
166   public void startIteration() {
167     oocEngine.startIteration();
168   }
169 
170   @Override
171   public Partition<I, V, E> getNextPartition() {
172     Integer partitionId = oocEngine.getNextPartition();
173     if (partitionId == null) {
174       return null;
175     }
176     Partition<I, V, E> partition = partitionStore.removePartition(partitionId);
177     if (partition == null) {
178       if (LOG.isInfoEnabled()) {
179         LOG.info("getNextPartition: partition " + partitionId + " is not in " +
180             "the partition store. Creating an empty partition for it.");
181       }
182       partition = conf.createPartition(partitionId, context);
183     }
184     partitionStore.addPartition(partition);
185     return partition;
186   }
187 
188   @Override
189   public void putPartition(Partition<I, V, E> partition) {
190     oocEngine.doneProcessingPartition(partition.getId());
191   }
192 
193   @Override
194   public void addPartitionVertices(Integer partitionId,
195                                    ExtendedDataOutput extendedDataOutput) {
196     addEntry(partitionId, extendedDataOutput);
197   }
198 
199   @Override
200   public void shutdown() {
201     oocEngine.shutdown();
202   }
203 
204   @Override
205   public void initialize() {
206     oocEngine.initialize();
207   }
208 
209   /**
210    * Read vertex data from an input and initialize the vertex.
211    *
212    * @param in     The input stream
213    * @param vertex The vertex to initialize
214    * @throws IOException
215    */
216   private void readVertexData(DataInput in, Vertex<I, V, E> vertex)
217       throws IOException {
218     I id = conf.createVertexId();
219     id.readFields(in);
220     V value = null;
221     boolean hasNullValue = in.readBoolean();
222     if (!hasNullValue) {
223       value = conf.createVertexValue();
224       value.readFields(in);
225     }
226     OutEdges<I, E> edges = conf.createAndInitializeOutEdges(0);
227     vertex.initialize(id, value, edges);
228     if (in.readBoolean()) {
229       vertex.voteToHalt();
230     } else {
231       vertex.wakeUp();
232     }
233   }
234 
235   /**
236    * Read vertex edges from an input and set them to the vertex.
237    *
238    * @param in        The input stream
239    * @param partition The partition owning the vertex
240    * @throws IOException
241    */
242   private void readOutEdges(DataInput in, Partition<I, V, E> partition)
243       throws IOException {
244     I id = conf.createVertexId();
245     id.readFields(in);
246     Vertex<I, V, E> v = partition.getVertex(id);
247     if (v == null) {
248       throw new IllegalStateException("Vertex with ID " + id +
249         " not found in partition " + partition.getId() +
250         " which has " + partition.getVertexCount() + " vertices and " +
251         partition.getEdgeCount() + " edges.");
252     }
253     OutEdges<I, E> edges = (OutEdges<I, E>) v.getEdges();
254     edges.readFields(in);
255     partition.saveVertex(v);
256   }
257 
258   @Override
259   protected long loadInMemoryPartitionData(int partitionId, int ioThreadId,
260                                            DataIndex index) throws IOException {
261     long numBytes = 0;
262     // Load vertices
263     if (hasPartitionDataOnFile.remove(partitionId)) {
264       Partition<I, V, E> partition = conf.createPartition(partitionId, context);
265       OutOfCoreDataAccessor dataAccessor = oocEngine.getDataAccessor();
266       index.addIndex(DataIndex.TypeIndexEntry.PARTITION_VERTICES);
267       OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
268           dataAccessor.prepareInput(ioThreadId, index.copy());
269       DataInput dataInput = inputWrapper.getDataInput();
270       long numVertices = dataInput.readLong();
271       for (long i = 0; i < numVertices; ++i) {
272         Vertex<I, V, E> vertex = conf.createVertex();
273         readVertexData(dataInput, vertex);
274         partition.putVertex(vertex);
275       }
276       numBytes += inputWrapper.finalizeInput(true);
277 
278       // Load edges
279       index.removeLastIndex()
280           .addIndex(DataIndex.TypeIndexEntry.PARTITION_EDGES);
281       inputWrapper = dataAccessor.prepareInput(ioThreadId, index.copy());
282       dataInput = inputWrapper.getDataInput();
283       for (int i = 0; i < numVertices; ++i) {
284         readOutEdges(dataInput, partition);
285       }
286       // If the graph is static and it is not INPUT_SUPERSTEP, keep the file
287       // around.
288       boolean shouldDeleteEdges = false;
289       if (!conf.isStaticGraph() ||
290           oocEngine.getSuperstep() == BspService.INPUT_SUPERSTEP) {
291         shouldDeleteEdges = true;
292       }
293       numBytes += inputWrapper.finalizeInput(shouldDeleteEdges);
294       index.removeLastIndex();
295       partitionStore.addPartition(partition);
296     }
297     return numBytes;
298   }
299 
300   @Override
301   protected ExtendedDataOutput readNextEntry(DataInput in) throws IOException {
302     return WritableUtils.readExtendedDataOutput(in, conf);
303   }
304 
305   @Override
306   protected void addEntryToInMemoryPartitionData(int partitionId,
307                                                  ExtendedDataOutput vertices) {
308     if (!partitionStore.hasPartition(partitionId)) {
309       oocEngine.getMetaPartitionManager().addPartition(partitionId);
310     }
311     partitionStore.addPartitionVertices(partitionId, vertices);
312   }
313 
314   @Override
315   public long loadPartitionData(int partitionId)
316       throws IOException {
317     return loadPartitionDataProxy(partitionId,
318         new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION));
319   }
320 
321   @Override
322   public long offloadPartitionData(int partitionId)
323       throws IOException {
324     return offloadPartitionDataProxy(partitionId,
325         new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION));
326   }
327 
328   /**
329    * Writes vertex data (Id, value and halted state) to stream.
330    *
331    * @param output The output stream
332    * @param vertex The vertex to serialize
333    * @throws IOException
334    */
335   private void writeVertexData(DataOutput output, Vertex<I, V, E> vertex)
336       throws IOException {
337     vertex.getId().write(output);
338     V value = vertex.getValue();
339     if (value != null) {
340       output.writeBoolean(false);
341       value.write(output);
342     } else {
343       output.writeBoolean(true);
344     }
345     output.writeBoolean(vertex.isHalted());
346   }
347 
348   /**
349    * Writes vertex edges (Id, edges) to stream.
350    *
351    * @param output The output stream
352    * @param vertex The vertex to serialize
353    * @throws IOException
354    */
355   private void writeOutEdges(DataOutput output, Vertex<I, V, E> vertex)
356       throws IOException {
357     vertex.getId().write(output);
358     OutEdges<I, E> edges = (OutEdges<I, E>) vertex.getEdges();
359     edges.write(output);
360   }
361 
362   @Override
363   protected long offloadInMemoryPartitionData(
364       int partitionId, int ioThreadId, DataIndex index) throws IOException {
365     long numBytes = 0;
366     if (partitionStore.hasPartition(partitionId)) {
367       OutOfCoreDataAccessor dataAccessor = oocEngine.getDataAccessor();
368       partitionVertexCount.put(partitionId,
369           partitionStore.getPartitionVertexCount(partitionId));
370       partitionEdgeCount.put(partitionId,
371           partitionStore.getPartitionEdgeCount(partitionId));
372       Partition<I, V, E> partition =
373           partitionStore.removePartition(partitionId);
374       LOG.debug(
375           "Offloading partition " + partition + " DataIndex[" + index + "]");
376       index.addIndex(DataIndex.TypeIndexEntry.PARTITION_VERTICES);
377       OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
378           dataAccessor.prepareOutput(ioThreadId, index.copy(), false);
379       DataOutput dataOutput = outputWrapper.getDataOutput();
380       dataOutput.writeLong(partition.getVertexCount());
381       for (Vertex<I, V, E> vertex : partition) {
382         writeVertexData(dataOutput, vertex);
383       }
384       numBytes += outputWrapper.finalizeOutput();
385       index.removeLastIndex();
386       // Avoid writing back edges if we have already written them once and
387       // the graph is not changing.
388       // If we are in the input superstep, we need to write the files
389       // at least the first time, even though the graph is static.
390       index.addIndex(DataIndex.TypeIndexEntry.PARTITION_EDGES);
391       if (oocEngine.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP ||
392           !conf.isStaticGraph() ||
393           !dataAccessor.dataExist(ioThreadId, index)) {
394         outputWrapper = dataAccessor.prepareOutput(ioThreadId, index.copy(),
395             false);
396         for (Vertex<I, V, E> vertex : partition) {
397           writeOutEdges(outputWrapper.getDataOutput(), vertex);
398         }
399         numBytes += outputWrapper.finalizeOutput();
400       }
401       index.removeLastIndex();
402       hasPartitionDataOnFile.add(partitionId);
403     }
404     return numBytes;
405   }
406 
407   @Override
408   protected void writeEntry(ExtendedDataOutput vertices, DataOutput out)
409       throws IOException {
410     WritableUtils.writeExtendedDataOutput(vertices, out);
411   }
412 
413   @Override
414   public long offloadBuffers(int partitionId)
415       throws IOException {
416     return offloadBuffersProxy(partitionId,
417         new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION));
418   }
419 
420   @Override
421   protected int entrySerializedSize(ExtendedDataOutput vertices) {
422     return vertices.getPos();
423   }
424 }