This project has retired. For details please refer to its
        
        Attic page.
      
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.giraph.ooc.data;
20  
21  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22  import org.apache.giraph.edge.EdgeStore;
23  import org.apache.giraph.ooc.OutOfCoreEngine;
24  import org.apache.giraph.ooc.persistence.DataIndex;
25  import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
26  import org.apache.giraph.utils.ByteArrayVertexIdEdges;
27  import org.apache.giraph.utils.VertexIdEdges;
28  import org.apache.hadoop.io.Writable;
29  import org.apache.hadoop.io.WritableComparable;
30  import org.apache.log4j.Logger;
31  
32  import java.io.DataInput;
33  import java.io.DataOutput;
34  import java.io.IOException;
35  
36  
37  
38  
39  
40  
41  
42  
43  public class DiskBackedEdgeStore<I extends WritableComparable,
44      V extends Writable, E extends Writable>
45      extends DiskBackedDataStore<VertexIdEdges<I, E>>
46      implements EdgeStore<I, V, E> {
47    
48    private static final Logger LOG = Logger.getLogger(DiskBackedEdgeStore.class);
49    
50    private final EdgeStore<I, V, E> edgeStore;
51    
52    private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
53  
54    
55  
56  
57  
58  
59  
60  
61  
62    public DiskBackedEdgeStore(
63        EdgeStore<I, V, E> edgeStore,
64        ImmutableClassesGiraphConfiguration<I, V, E> conf,
65        OutOfCoreEngine oocEngine) {
66      super(conf, oocEngine);
67      this.edgeStore = edgeStore;
68      this.conf = conf;
69    }
70  
71    @Override
72    public void addPartitionEdges(int partitionId, VertexIdEdges<I, E> edges) {
73      addEntry(partitionId, edges);
74    }
75  
76    @Override
77    public void moveEdgesToVertices() {
78      edgeStore.moveEdgesToVertices();
79    }
80  
81    @Override
82    public void writePartitionEdgeStore(int partitionId, DataOutput output)
83        throws IOException {
84      
85      
86      throw new IllegalStateException("writePartitionEdgeStore: this method " +
87          "should not be called for DiskBackedEdgeStore!");
88    }
89  
90    @Override
91    public void readPartitionEdgeStore(int partitionId, DataInput input)
92        throws IOException {
93      
94      
95      throw new IllegalStateException("readPartitionEdgeStore: this method " +
96          "should not be called for DiskBackedEdgeStore!");
97    }
98  
99    @Override
100   public boolean hasEdgesForPartition(int partitionId) {
101     
102     
103     throw new IllegalStateException("hasEdgesForPartition: this method " +
104         "should not be called for DiskBackedEdgeStore!");
105   }
106 
107   @Override
108   public long loadPartitionData(int partitionId)
109       throws IOException {
110     return loadPartitionDataProxy(partitionId,
111         new DataIndex().addIndex(DataIndex.TypeIndexEntry.EDGE_STORE));
112   }
113 
114   @Override
115   public long offloadPartitionData(int partitionId)
116       throws IOException {
117     return offloadPartitionDataProxy(partitionId,
118         new DataIndex().addIndex(DataIndex.TypeIndexEntry.EDGE_STORE));
119   }
120 
121   @Override
122   public long offloadBuffers(int partitionId)
123       throws IOException {
124     return offloadBuffersProxy(partitionId,
125         new DataIndex().addIndex(DataIndex.TypeIndexEntry.EDGE_STORE));
126   }
127 
128   @Override
129   protected void writeEntry(VertexIdEdges<I, E> edges, DataOutput out)
130       throws IOException {
131     edges.write(out);
132   }
133 
134   @Override
135   protected VertexIdEdges<I, E> readNextEntry(DataInput in) throws IOException {
136     VertexIdEdges<I, E> vertexIdEdges = new ByteArrayVertexIdEdges<>();
137     vertexIdEdges.setConf(conf);
138     vertexIdEdges.readFields(in);
139     return vertexIdEdges;
140   }
141 
142   @Override
143   protected long loadInMemoryPartitionData(
144       int partitionId, int ioThreadId, DataIndex index) throws IOException {
145     long numBytes = 0;
146     if (hasPartitionDataOnFile.remove(partitionId)) {
147       OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
148           oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy());
149       edgeStore.readPartitionEdgeStore(partitionId,
150           inputWrapper.getDataInput());
151       numBytes = inputWrapper.finalizeInput(true);
152     }
153     return numBytes;
154   }
155 
156   @Override
157   protected long offloadInMemoryPartitionData(
158       int partitionId, int ioThreadId, DataIndex index) throws IOException {
159     long numBytes = 0;
160     if (edgeStore.hasEdgesForPartition(partitionId)) {
161       OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
162           oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(),
163               false);
164       edgeStore.writePartitionEdgeStore(partitionId,
165           outputWrapper.getDataOutput());
166       numBytes = outputWrapper.finalizeOutput();
167       hasPartitionDataOnFile.add(partitionId);
168     }
169     return numBytes;
170   }
171 
172   @Override
173   protected int entrySerializedSize(VertexIdEdges<I, E> edges) {
174     return edges.getSerializedSize();
175   }
176 
177   @Override
178   protected void addEntryToInMemoryPartitionData(int partitionId,
179                                                  VertexIdEdges<I, E> edges) {
180     oocEngine.getMetaPartitionManager().addPartition(partitionId);
181     edgeStore.addPartitionEdges(partitionId, edges);
182   }
183 }