View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc.persistence;
20  
21  import org.apache.giraph.conf.GiraphConstants;
22  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23  import org.apache.giraph.utils.io.BigDataInput;
24  import org.apache.giraph.utils.io.BigDataOutput;
25  
26  import java.io.DataInput;
27  import java.io.DataOutput;
28  import java.io.IOException;
29  import java.util.concurrent.ConcurrentHashMap;
30  
31  /**
32   * Implementation of data accessor which keeps all the data serialized but in
33   * memory. Useful to keep the number of used objects under control.
34   *
35   * TODO currently doesn't reuse any of the byte arrays so could cause more GCs
36   */
37  public class InMemoryDataAccessor implements OutOfCoreDataAccessor {
38    /** Configuration */
39    private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
40    /** DataInputOutput for each DataIndex used */
41    private final ConcurrentHashMap<DataIndex, BigDataOutput> data;
42  
43    /**
44     * Constructor
45     *
46     * @param conf Configuration
47     */
48    public InMemoryDataAccessor(
49        ImmutableClassesGiraphConfiguration<?, ?, ?> conf) {
50      this.conf = conf;
51      data = new ConcurrentHashMap<>();
52    }
53  
54    @Override
55    public void initialize() {
56      // No-op
57    }
58  
59    @Override
60    public void shutdown() {
61      // No-op
62    }
63  
64    @Override
65    public int getNumAccessorThreads() {
66      return GiraphConstants.NUM_OUT_OF_CORE_THREADS.get(conf);
67    }
68  
69    @Override
70    public DataInputWrapper prepareInput(int threadId,
71        DataIndex index) throws IOException {
72      return new InMemoryDataInputWrapper(
73          new BigDataInput(data.get(index)), index);
74    }
75  
76    @Override
77    public DataOutputWrapper prepareOutput(int threadId,
78        DataIndex index, boolean shouldAppend) throws IOException {
79      // Don't need to worry about synchronization here since only one thread
80      // can deal with one index
81      BigDataOutput output = data.get(index);
82      if (output == null || !shouldAppend) {
83        output = new BigDataOutput(conf);
84        data.put(index, output);
85      }
86      return new InMemoryDataOutputWrapper(output);
87    }
88  
89    @Override
90    public boolean dataExist(int threadId, DataIndex index) {
91      return data.containsKey(index);
92    }
93  
94    /**
95     * {@link DataOutputWrapper} implementation for {@link InMemoryDataAccessor}
96     */
97    public static class InMemoryDataOutputWrapper implements DataOutputWrapper {
98      /** Output to write data to */
99      private final BigDataOutput output;
100     /** Size of output at the moment it was created */
101     private final long initialSize;
102 
103     /**
104      * Constructor
105      *
106      * @param output Output to write data to
107      */
108     public InMemoryDataOutputWrapper(BigDataOutput output) {
109       this.output = output;
110       initialSize = output.getSize();
111     }
112 
113     @Override
114     public DataOutput getDataOutput() {
115       return output;
116     }
117 
118     @Override
119     public long finalizeOutput() {
120       return output.getSize() - initialSize;
121     }
122   }
123 
124   /**
125    * {@link DataInputWrapper} implementation for {@link InMemoryDataAccessor}
126    */
127   public class InMemoryDataInputWrapper implements DataInputWrapper {
128     /** Input to read data from */
129     private final BigDataInput input;
130     /** DataIndex which this wrapper belongs to */
131     private final DataIndex index;
132 
133     /**
134      * Constructor
135      *
136      * @param input Input to read data from
137      * @param index DataIndex which this wrapper belongs to
138      */
139     public InMemoryDataInputWrapper(
140         BigDataInput input, DataIndex index) {
141       this.input = input;
142       this.index = index;
143     }
144 
145     @Override
146     public DataInput getDataInput() {
147       return input;
148     }
149 
150     @Override
151     public long finalizeInput(boolean deleteOnClose) {
152       if (deleteOnClose) {
153         data.remove(index);
154       }
155       return input.getPos();
156     }
157   }
158 }