This project has retired. For details please refer to its Attic page.
InMemoryDataAccessor xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc.persistence;
20  
21  import org.apache.giraph.conf.GiraphConstants;
22  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23  import org.apache.giraph.conf.IntConfOption;
24  import org.apache.giraph.utils.ExtendedDataOutput;
25  import org.apache.giraph.utils.io.BigDataInput;
26  import org.apache.giraph.utils.io.BigDataOutput;
27  
28  import java.io.DataInput;
29  import java.io.DataOutput;
30  import java.io.IOException;
31  import java.util.concurrent.ConcurrentHashMap;
32  import java.util.concurrent.LinkedBlockingDeque;
33  
34  /**
35   * Implementation of data accessor which keeps all the data serialized but in
36   * memory. Useful to keep the number of used objects under control.
37   */
38  public class InMemoryDataAccessor implements OutOfCoreDataAccessor {
39    /** Configuration */
40    private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
41    /** Factory for data outputs */
42    private final PooledBigDataOutputFactory outputFactory;
43    /** DataInputOutput for each DataIndex used */
44    private final ConcurrentHashMap<
45        DataIndex, PooledBigDataOutputFactory.Output> data;
46  
47    /**
48     * Constructor
49     *
50     * @param conf Configuration
51     */
52    public InMemoryDataAccessor(
53        ImmutableClassesGiraphConfiguration<?, ?, ?> conf) {
54      this.conf = conf;
55      outputFactory = new PooledBigDataOutputFactory(conf);
56      data = new ConcurrentHashMap<>();
57    }
58  
59    @Override
60    public void initialize() {
61      // No-op
62    }
63  
64    @Override
65    public void shutdown() {
66      // No-op
67    }
68  
69    @Override
70    public int getNumAccessorThreads() {
71      return GiraphConstants.NUM_OUT_OF_CORE_THREADS.get(conf);
72    }
73  
74    @Override
75    public DataInputWrapper prepareInput(int threadId,
76        DataIndex index) throws IOException {
77      return new InMemoryDataInputWrapper(
78          new BigDataInput(data.get(index)), index);
79    }
80  
81    @Override
82    public DataOutputWrapper prepareOutput(int threadId,
83        DataIndex index, boolean shouldAppend) throws IOException {
84      // Don't need to worry about synchronization here since only one thread
85      // can deal with one index
86      PooledBigDataOutputFactory.Output output = data.get(index);
87      if (output == null || !shouldAppend) {
88        output = outputFactory.createOutput();
89        data.put(index, output);
90      }
91      return new InMemoryDataOutputWrapper(output);
92    }
93  
94    @Override
95    public boolean dataExist(int threadId, DataIndex index) {
96      return data.containsKey(index);
97    }
98  
99    /**
100    * {@link DataOutputWrapper} implementation for {@link InMemoryDataAccessor}
101    */
102   public static class InMemoryDataOutputWrapper implements DataOutputWrapper {
103     /** Output to write data to */
104     private final BigDataOutput output;
105     /** Size of output at the moment it was created */
106     private final long initialSize;
107 
108     /**
109      * Constructor
110      *
111      * @param output Output to write data to
112      */
113     public InMemoryDataOutputWrapper(BigDataOutput output) {
114       this.output = output;
115       initialSize = output.getSize();
116     }
117 
118     @Override
119     public DataOutput getDataOutput() {
120       return output;
121     }
122 
123     @Override
124     public long finalizeOutput() {
125       return output.getSize() - initialSize;
126     }
127   }
128 
129   /**
130    * {@link DataInputWrapper} implementation for {@link InMemoryDataAccessor}
131    */
132   public class InMemoryDataInputWrapper implements DataInputWrapper {
133     /** Input to read data from */
134     private final BigDataInput input;
135     /** DataIndex which this wrapper belongs to */
136     private final DataIndex index;
137 
138     /**
139      * Constructor
140      *
141      * @param input Input to read data from
142      * @param index DataIndex which this wrapper belongs to
143      */
144     public InMemoryDataInputWrapper(
145         BigDataInput input, DataIndex index) {
146       this.input = input;
147       this.index = index;
148     }
149 
150     @Override
151     public DataInput getDataInput() {
152       return input;
153     }
154 
155     @Override
156     public long finalizeInput(boolean deleteOnClose) {
157       if (deleteOnClose) {
158         data.remove(index).returnData();
159       }
160       return input.getPos();
161     }
162   }
163 
164   /**
165    * Factory for pooled big data outputs
166    */
167   private static class PooledBigDataOutputFactory {
168     /** How big pool of byte arrays to keep */
169     public static final IntConfOption BYTE_ARRAY_POOL_SIZE =
170         new IntConfOption("giraph.inMemoryDataAccessor.poolSize", 1024,
171             "How big pool of byte arrays to keep");
172     /** How big byte arrays to make */
173     public static final IntConfOption BYTE_ARRAY_SIZE =
174         new IntConfOption("giraph.inMemoryDataAccessor.byteArraySize", 1 << 21,
175             "How big byte arrays to make");
176 
177     /** Configuration */
178     private final ImmutableClassesGiraphConfiguration conf;
179     /** Pool of reusable byte[] */
180     private final LinkedBlockingDeque<byte[]> byteArrayPool;
181     /** How big byte arrays to make */
182     private final int byteArraySize;
183 
184     /**
185      * Constructor
186      *
187      * @param conf Configuration
188      */
189     public PooledBigDataOutputFactory(
190         ImmutableClassesGiraphConfiguration conf) {
191       this.conf = conf;
192       byteArrayPool = new LinkedBlockingDeque<>(BYTE_ARRAY_POOL_SIZE.get(conf));
193       byteArraySize = BYTE_ARRAY_SIZE.get(conf);
194     }
195 
196     /**
197      * Create new output to write to
198      *
199      * @return Output to write to
200      */
201     public Output createOutput() {
202       return new Output(conf);
203     }
204 
205     /**
206      * Implementation of BigDataOutput
207      */
208     private class Output extends BigDataOutput {
209       /**
210        * Constructor
211        *
212        * @param conf Configuration
213        */
214       public Output(ImmutableClassesGiraphConfiguration conf) {
215         super(conf);
216       }
217 
218       /**
219        * Return all data structures related to this data output.
220        * Can't use the same instance after this call anymore.
221        */
222       protected void returnData() {
223         if (dataOutputs != null) {
224           for (ExtendedDataOutput dataOutput : dataOutputs) {
225             byteArrayPool.offer(dataOutput.getByteArray());
226           }
227         }
228         byteArrayPool.offer(currentDataOutput.getByteArray());
229       }
230 
231       @Override
232       protected ExtendedDataOutput createOutput(int size) {
233         byte[] data = byteArrayPool.pollLast();
234         return conf.createExtendedDataOutput(
235             data == null ? new byte[byteArraySize] : data, 0);
236       }
237 
238       @Override
239       protected int getMaxSize() {
240         return byteArraySize;
241       }
242     }
243   }
244 }