View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc.data;
20  
21  import com.google.common.collect.Maps;
22  import org.apache.giraph.bsp.BspService;
23  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24  import org.apache.giraph.edge.OutEdges;
25  import org.apache.giraph.graph.Vertex;
26  import org.apache.giraph.ooc.OutOfCoreEngine;
27  import org.apache.giraph.ooc.persistence.DataIndex;
28  import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
29  import org.apache.giraph.partition.Partition;
30  import org.apache.giraph.partition.PartitionStore;
31  import org.apache.giraph.utils.ExtendedDataOutput;
32  import org.apache.giraph.utils.WritableUtils;
33  import org.apache.giraph.worker.BspServiceWorker;
34  import org.apache.hadoop.io.Writable;
35  import org.apache.hadoop.io.WritableComparable;
36  import org.apache.hadoop.mapreduce.Mapper;
37  import org.apache.log4j.Logger;
38  
39  import java.io.DataInput;
40  import java.io.DataOutput;
41  import java.io.IOException;
42  import java.util.Map;
43  
44  import static com.google.common.base.Preconditions.checkNotNull;
45  
46  /**
47   * Implementation of a partition-store used for out-of-core mechanism.
48   * Partition store is responsible for partition data, as well as data buffers in
49   * INPUT_SUPERSTEP ("raw data buffer" -- defined in DiskBackedDataStore --
50   * refers to vertex buffers in INPUT_SUPERSTEP).
51   *
52   * @param <I> Vertex id
53   * @param <V> Vertex data
54   * @param <E> Edge data
55   */
56  public class DiskBackedPartitionStore<I extends WritableComparable,
57      V extends Writable, E extends Writable>
58      extends DiskBackedDataStore<ExtendedDataOutput>
59      implements PartitionStore<I, V, E> {
60    /** Class logger. */
61    private static final Logger LOG =
62        Logger.getLogger(DiskBackedPartitionStore.class);
63    /** Configuration */
64    private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
65    /** Job context (for progress) */
66    private final Mapper<?, ?, ?, ?>.Context context;
67    /** In-memory partition store */
68    private final PartitionStore<I, V, E> partitionStore;
69    /**
70     * Keeps number of vertices in partitions, right when they are last spilled
71     * to the disk. This value may be inaccurate for in-memory partitions, but
72     * is accurate for out-of-core partitions.
73     */
74    private final Map<Integer, Long> partitionVertexCount =
75        Maps.newConcurrentMap();
76    /**
77     * Keeps number of edges in partitions, right when they are last spilled
78     * to the disk. This value may be inaccurate for in-memory partitions, but
79     * is accurate for out-of-core partitions.
80     */
81    private final Map<Integer, Long> partitionEdgeCount =
82        Maps.newConcurrentMap();
83  
84    /**
85     * Constructor.
86     *
87     * @param partitionStore In-memory partition store for which out-of-code
88     *                       partition store would be a wrapper
89     * @param conf Configuration
90     * @param context Job context
91     * @param oocEngine Out-of-core engine
92     */
93    public DiskBackedPartitionStore(
94        PartitionStore<I, V, E> partitionStore,
95        ImmutableClassesGiraphConfiguration<I, V, E> conf,
96        Mapper<?, ?, ?, ?>.Context context,
97        OutOfCoreEngine oocEngine) {
98      super(conf, oocEngine);
99      this.partitionStore = partitionStore;
100     this.conf = conf;
101     this.context = context;
102   }
103 
104   @Override
105   public boolean addPartition(Partition<I, V, E> partition) {
106     boolean added = partitionStore.addPartition(partition);
107     if (added) {
108       oocEngine.getMetaPartitionManager()
109           .addPartition(partition.getId());
110     }
111     return added;
112   }
113 
114   @Override
115   public Partition<I, V, E> removePartition(Integer partitionId) {
116     // Set the partition as 'in process' so its data and messages do not get
117     // spilled to disk until the remove is complete.
118     oocEngine.getMetaPartitionManager().markPartitionAsInProcess(partitionId);
119     oocEngine.retrievePartition(partitionId);
120     Partition<I, V, E> partition = partitionStore.removePartition(partitionId);
121     checkNotNull(partition, "removePartition: partition " + partitionId +
122         " is not in memory for removal!");
123     oocEngine.getMetaPartitionManager().removePartition(partitionId);
124     return partition;
125   }
126 
127   @Override
128   public boolean hasPartition(Integer partitionId) {
129     return oocEngine.getMetaPartitionManager().hasPartition(partitionId);
130   }
131 
132   @Override
133   public Iterable<Integer> getPartitionIds() {
134     return oocEngine.getMetaPartitionManager().getPartitionIds();
135   }
136 
137   @Override
138   public int getNumPartitions() {
139     return oocEngine.getMetaPartitionManager().getNumPartitions();
140   }
141 
142   @Override
143   public long getPartitionVertexCount(Integer partitionId) {
144     if (partitionStore.hasPartition(partitionId)) {
145       return partitionStore.getPartitionVertexCount(partitionId);
146     } else {
147       return partitionVertexCount.get(partitionId);
148     }
149   }
150 
151   @Override
152   public long getPartitionEdgeCount(Integer partitionId) {
153     if (partitionStore.hasPartition(partitionId)) {
154       return partitionStore.getPartitionEdgeCount(partitionId);
155     } else {
156       return partitionEdgeCount.get(partitionId);
157     }
158   }
159 
160   @Override
161   public boolean isEmpty() {
162     return getNumPartitions() == 0;
163   }
164 
165   @Override
166   public void startIteration() {
167     oocEngine.startIteration();
168   }
169 
170   @Override
171   public Partition<I, V, E> getNextPartition() {
172     Integer partitionId = oocEngine.getNextPartition();
173     if (partitionId == null) {
174       return null;
175     }
176     Partition<I, V, E> partition = partitionStore.removePartition(partitionId);
177     if (partition == null) {
178       if (LOG.isInfoEnabled()) {
179         LOG.info("getNextPartition: partition " + partitionId + " is not in " +
180             "the partition store. Creating an empty partition for it.");
181       }
182       partition = conf.createPartition(partitionId, context);
183     }
184     partitionStore.addPartition(partition);
185     return partition;
186   }
187 
188   @Override
189   public void putPartition(Partition<I, V, E> partition) {
190     oocEngine.doneProcessingPartition(partition.getId());
191   }
192 
193   @Override
194   public void addPartitionVertices(Integer partitionId,
195                                    ExtendedDataOutput extendedDataOutput) {
196     addEntry(partitionId, extendedDataOutput);
197   }
198 
199   @Override
200   public void shutdown() {
201     oocEngine.shutdown();
202   }
203 
204   @Override
205   public void initialize() {
206     oocEngine.initialize();
207   }
208 
209   /**
210    * Read vertex data from an input and initialize the vertex.
211    *
212    * @param in     The input stream
213    * @param vertex The vertex to initialize
214    * @throws IOException
215    */
216   private void readVertexData(DataInput in, Vertex<I, V, E> vertex)
217       throws IOException {
218     I id = conf.createVertexId();
219     id.readFields(in);
220     V value = null;
221     boolean hasNullValue = in.readBoolean();
222     if (!hasNullValue) {
223       value = conf.createVertexValue();
224       value.readFields(in);
225     }
226     OutEdges<I, E> edges = conf.createAndInitializeOutEdges(0);
227     vertex.initialize(id, value, edges);
228     if (in.readBoolean()) {
229       vertex.voteToHalt();
230     } else {
231       vertex.wakeUp();
232     }
233   }
234 
235   /**
236    * Read vertex edges from an input and set them to the vertex.
237    *
238    * @param in        The input stream
239    * @param partition The partition owning the vertex
240    * @throws IOException
241    */
242   private void readOutEdges(DataInput in, Partition<I, V, E> partition)
243       throws IOException {
244     I id = conf.createVertexId();
245     id.readFields(in);
246     Vertex<I, V, E> v = partition.getVertex(id);
247     OutEdges<I, E> edges = (OutEdges<I, E>) v.getEdges();
248     edges.readFields(in);
249     partition.saveVertex(v);
250   }
251 
252   @Override
253   protected long loadInMemoryPartitionData(int partitionId, int ioThreadId,
254                                            DataIndex index) throws IOException {
255     long numBytes = 0;
256     // Load vertices
257     if (hasPartitionDataOnFile.remove(partitionId)) {
258       Partition<I, V, E> partition = conf.createPartition(partitionId, context);
259       OutOfCoreDataAccessor dataAccessor = oocEngine.getDataAccessor();
260       index.addIndex(DataIndex.TypeIndexEntry.PARTITION_VERTICES);
261       OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
262           dataAccessor.prepareInput(ioThreadId, index.copy());
263       DataInput dataInput = inputWrapper.getDataInput();
264       long numVertices = dataInput.readLong();
265       for (long i = 0; i < numVertices; ++i) {
266         Vertex<I, V, E> vertex = conf.createVertex();
267         readVertexData(dataInput, vertex);
268         partition.putVertex(vertex);
269       }
270       numBytes += inputWrapper.finalizeInput(true);
271 
272       // Load edges
273       index.removeLastIndex()
274           .addIndex(DataIndex.TypeIndexEntry.PARTITION_EDGES);
275       inputWrapper = dataAccessor.prepareInput(ioThreadId, index.copy());
276       dataInput = inputWrapper.getDataInput();
277       for (int i = 0; i < numVertices; ++i) {
278         readOutEdges(dataInput, partition);
279       }
280       // If the graph is static and it is not INPUT_SUPERSTEP, keep the file
281       // around.
282       boolean shouldDeleteEdges = false;
283       if (!conf.isStaticGraph() ||
284           oocEngine.getSuperstep() == BspService.INPUT_SUPERSTEP) {
285         shouldDeleteEdges = true;
286       }
287       numBytes += inputWrapper.finalizeInput(shouldDeleteEdges);
288       index.removeLastIndex();
289       partitionStore.addPartition(partition);
290     }
291     return numBytes;
292   }
293 
294   @Override
295   protected ExtendedDataOutput readNextEntry(DataInput in) throws IOException {
296     return WritableUtils.readExtendedDataOutput(in, conf);
297   }
298 
299   @Override
300   protected void addEntryToInMemoryPartitionData(int partitionId,
301                                                  ExtendedDataOutput vertices) {
302     if (!partitionStore.hasPartition(partitionId)) {
303       oocEngine.getMetaPartitionManager().addPartition(partitionId);
304     }
305     partitionStore.addPartitionVertices(partitionId, vertices);
306   }
307 
308   @Override
309   public long loadPartitionData(int partitionId)
310       throws IOException {
311     return loadPartitionDataProxy(partitionId,
312         new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION));
313   }
314 
315   @Override
316   public long offloadPartitionData(int partitionId)
317       throws IOException {
318     return offloadPartitionDataProxy(partitionId,
319         new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION));
320   }
321 
322   /**
323    * Writes vertex data (Id, value and halted state) to stream.
324    *
325    * @param output The output stream
326    * @param vertex The vertex to serialize
327    * @throws IOException
328    */
329   private void writeVertexData(DataOutput output, Vertex<I, V, E> vertex)
330       throws IOException {
331     vertex.getId().write(output);
332     V value = vertex.getValue();
333     if (value != null) {
334       output.writeBoolean(false);
335       value.write(output);
336     } else {
337       output.writeBoolean(true);
338     }
339     output.writeBoolean(vertex.isHalted());
340   }
341 
342   /**
343    * Writes vertex edges (Id, edges) to stream.
344    *
345    * @param output The output stream
346    * @param vertex The vertex to serialize
347    * @throws IOException
348    */
349   private void writeOutEdges(DataOutput output, Vertex<I, V, E> vertex)
350       throws IOException {
351     vertex.getId().write(output);
352     OutEdges<I, E> edges = (OutEdges<I, E>) vertex.getEdges();
353     edges.write(output);
354   }
355 
356   @Override
357   protected long offloadInMemoryPartitionData(
358       int partitionId, int ioThreadId, DataIndex index) throws IOException {
359     long numBytes = 0;
360     if (partitionStore.hasPartition(partitionId)) {
361       OutOfCoreDataAccessor dataAccessor = oocEngine.getDataAccessor();
362       partitionVertexCount.put(partitionId,
363           partitionStore.getPartitionVertexCount(partitionId));
364       partitionEdgeCount.put(partitionId,
365           partitionStore.getPartitionEdgeCount(partitionId));
366       Partition<I, V, E> partition =
367           partitionStore.removePartition(partitionId);
368       index.addIndex(DataIndex.TypeIndexEntry.PARTITION_VERTICES);
369       OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
370           dataAccessor.prepareOutput(ioThreadId, index.copy(), false);
371       DataOutput dataOutput = outputWrapper.getDataOutput();
372       dataOutput.writeLong(partition.getVertexCount());
373       for (Vertex<I, V, E> vertex : partition) {
374         writeVertexData(dataOutput, vertex);
375       }
376       numBytes += outputWrapper.finalizeOutput();
377       index.removeLastIndex();
378       // Avoid writing back edges if we have already written them once and
379       // the graph is not changing.
380       // If we are in the input superstep, we need to write the files
381       // at least the first time, even though the graph is static.
382       index.addIndex(DataIndex.TypeIndexEntry.PARTITION_EDGES);
383       if (oocEngine.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP ||
384           !conf.isStaticGraph() ||
385           !dataAccessor.dataExist(ioThreadId, index)) {
386         outputWrapper = dataAccessor.prepareOutput(ioThreadId, index.copy(),
387             false);
388         for (Vertex<I, V, E> vertex : partition) {
389           writeOutEdges(outputWrapper.getDataOutput(), vertex);
390         }
391         numBytes += outputWrapper.finalizeOutput();
392       }
393       index.removeLastIndex();
394       hasPartitionDataOnFile.add(partitionId);
395     }
396     return numBytes;
397   }
398 
399   @Override
400   protected void writeEntry(ExtendedDataOutput vertices, DataOutput out)
401       throws IOException {
402     WritableUtils.writeExtendedDataOutput(vertices, out);
403   }
404 
405   @Override
406   public long offloadBuffers(int partitionId)
407       throws IOException {
408     return offloadBuffersProxy(partitionId,
409         new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION));
410   }
411 
412   @Override
413   protected int entrySerializedSize(ExtendedDataOutput vertices) {
414     return vertices.getPos();
415   }
416 }