This project has retired. For details please refer to its Attic page.
DiskBackedEdgeStore xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc.data;
20  
21  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22  import org.apache.giraph.edge.EdgeStore;
23  import org.apache.giraph.ooc.OutOfCoreEngine;
24  import org.apache.giraph.ooc.persistence.DataIndex;
25  import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
26  import org.apache.giraph.utils.ByteArrayVertexIdEdges;
27  import org.apache.giraph.utils.VertexIdEdges;
28  import org.apache.hadoop.io.Writable;
29  import org.apache.hadoop.io.WritableComparable;
30  import org.apache.log4j.Logger;
31  
32  import java.io.DataInput;
33  import java.io.DataOutput;
34  import java.io.IOException;
35  
36  /**
37   * Implementation of an edge-store used for out-of-core mechanism.
38   *
39   * @param <I> Vertex id
40   * @param <V> Vertex data
41   * @param <E> Edge data
42   */
43  public class DiskBackedEdgeStore<I extends WritableComparable,
44      V extends Writable, E extends Writable>
45      extends DiskBackedDataStore<VertexIdEdges<I, E>>
46      implements EdgeStore<I, V, E> {
47    /** Class logger. */
48    private static final Logger LOG = Logger.getLogger(DiskBackedEdgeStore.class);
49    /** In-memory message store */
50    private final EdgeStore<I, V, E> edgeStore;
51    /** Configuration */
52    private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
53  
54    /**
55     * Constructor
56     *
57     * @param edgeStore In-memory edge store for which out-of-core edge store
58     *                  would be a wrapper
59     * @param conf Configuration
60     * @param oocEngine Out-of-core engine
61     */
62    public DiskBackedEdgeStore(
63        EdgeStore<I, V, E> edgeStore,
64        ImmutableClassesGiraphConfiguration<I, V, E> conf,
65        OutOfCoreEngine oocEngine) {
66      super(conf, oocEngine);
67      this.edgeStore = edgeStore;
68      this.conf = conf;
69    }
70  
71    @Override
72    public void addPartitionEdges(int partitionId, VertexIdEdges<I, E> edges) {
73      addEntry(partitionId, edges);
74    }
75  
76    @Override
77    public void moveEdgesToVertices() {
78      edgeStore.moveEdgesToVertices();
79    }
80  
81    @Override
82    public void writePartitionEdgeStore(int partitionId, DataOutput output)
83        throws IOException {
84      // This method is only called (should only be called) on in-memory edge
85      // stores
86      throw new IllegalStateException("writePartitionEdgeStore: this method " +
87          "should not be called for DiskBackedEdgeStore!");
88    }
89  
90    @Override
91    public void readPartitionEdgeStore(int partitionId, DataInput input)
92        throws IOException {
93      // This method is only called (should only be called) on in-memory edge
94      // stores
95      throw new IllegalStateException("readPartitionEdgeStore: this method " +
96          "should not be called for DiskBackedEdgeStore!");
97    }
98  
99    @Override
100   public boolean hasEdgesForPartition(int partitionId) {
101     // This method is only called (should only be called) on in-memory edge
102     // stores
103     throw new IllegalStateException("hasEdgesForPartition: this method " +
104         "should not be called for DiskBackedEdgeStore!");
105   }
106 
107   @Override
108   public long loadPartitionData(int partitionId)
109       throws IOException {
110     return loadPartitionDataProxy(partitionId,
111         new DataIndex().addIndex(DataIndex.TypeIndexEntry.EDGE_STORE));
112   }
113 
114   @Override
115   public long offloadPartitionData(int partitionId)
116       throws IOException {
117     return offloadPartitionDataProxy(partitionId,
118         new DataIndex().addIndex(DataIndex.TypeIndexEntry.EDGE_STORE));
119   }
120 
121   @Override
122   public long offloadBuffers(int partitionId)
123       throws IOException {
124     return offloadBuffersProxy(partitionId,
125         new DataIndex().addIndex(DataIndex.TypeIndexEntry.EDGE_STORE));
126   }
127 
128   @Override
129   protected void writeEntry(VertexIdEdges<I, E> edges, DataOutput out)
130       throws IOException {
131     edges.write(out);
132   }
133 
134   @Override
135   protected VertexIdEdges<I, E> readNextEntry(DataInput in) throws IOException {
136     VertexIdEdges<I, E> vertexIdEdges = new ByteArrayVertexIdEdges<>();
137     vertexIdEdges.setConf(conf);
138     vertexIdEdges.readFields(in);
139     return vertexIdEdges;
140   }
141 
142   @Override
143   protected long loadInMemoryPartitionData(
144       int partitionId, int ioThreadId, DataIndex index) throws IOException {
145     long numBytes = 0;
146     if (hasPartitionDataOnFile.remove(partitionId)) {
147       OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
148           oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy());
149       edgeStore.readPartitionEdgeStore(partitionId,
150           inputWrapper.getDataInput());
151       numBytes = inputWrapper.finalizeInput(true);
152     }
153     return numBytes;
154   }
155 
156   @Override
157   protected long offloadInMemoryPartitionData(
158       int partitionId, int ioThreadId, DataIndex index) throws IOException {
159     long numBytes = 0;
160     if (edgeStore.hasEdgesForPartition(partitionId)) {
161       OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
162           oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(),
163               false);
164       edgeStore.writePartitionEdgeStore(partitionId,
165           outputWrapper.getDataOutput());
166       numBytes = outputWrapper.finalizeOutput();
167       hasPartitionDataOnFile.add(partitionId);
168     }
169     return numBytes;
170   }
171 
172   @Override
173   protected int entrySerializedSize(VertexIdEdges<I, E> edges) {
174     return edges.getSerializedSize();
175   }
176 
177   @Override
178   protected void addEntryToInMemoryPartitionData(int partitionId,
179                                                  VertexIdEdges<I, E> edges) {
180     oocEngine.getMetaPartitionManager().addPartition(partitionId);
181     edgeStore.addPartitionEdges(partitionId, edges);
182   }
183 }