This project has retired. For details please refer to its Attic page.
ExtendedByteArrayOutputBuffer xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.utils;
20  
21  import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
22  
23  import java.util.concurrent.atomic.AtomicInteger;
24  
25  import org.apache.giraph.conf.FloatConfOption;
26  import org.apache.giraph.conf.GiraphConstants;
27  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
28  import org.apache.giraph.conf.IntConfOption;
29  
30  /**
31   * Wraps a list of byte array outputs and provides convenient
32   * utilities on top of it
33   */
34  public class ExtendedByteArrayOutputBuffer {
35    /**
36     * This option sets the capacity of an
37     * {@link org.apache.giraph.utils.ExtendedDataOutput} instance created in
38     * {@link org.apache.giraph.utils.ExtendedByteArrayOutputBuffer}
39     */
40    public static final IntConfOption CAPACITY_OF_DATAOUT_IN_BUFFER =
41        new IntConfOption("giraph.capacityOfDataOutInBuffer",
42            1024 * GiraphConstants.ONE_KB,
43            "Set the capacity of dataoutputs in dataout buffer");
44  
45    /**
46     * This option sets the maximum fraction of a
47     * {@link org.apache.giraph.utils.ExtendedDataOutput} instance (stored in
48     * {@link org.apache.giraph.utils.ExtendedByteArrayOutputBuffer})
49     * that can be filled
50     */
51    public static final FloatConfOption FILLING_THRESHOLD_OF_DATAOUT_IN_BUFFER =
52        new FloatConfOption("giraph.fillingThresholdOfDataoutInBuffer", 0.98f,
53            "Set the maximum fraction of dataoutput capacity allowed to fill");
54  
55    /** Maximum size allowed for one byte array output */
56    private final int maxBufSize;
57    /** Stop writing to buffer after threshold has been reached */
58    private final int threshold;
59    /** Giraph configuration */
60    private final ImmutableClassesGiraphConfiguration<?, ? , ?> config;
61  
62    /** Map of index => byte array outputs */
63    private final Int2ObjectOpenHashMap<ExtendedDataOutput>
64    bytearrayOutputs = new Int2ObjectOpenHashMap<>();
65    /** Size of byte array outputs map */
66    private final AtomicInteger mapSize = new AtomicInteger(0);
67    /** Thread local variable to get hold of a byte array output stream */
68    private final ThreadLocal<IndexAndDataOut> threadLocal =
69      new ThreadLocal<IndexAndDataOut>() {
70        @Override
71        protected IndexAndDataOut initialValue() {
72          return newIndexAndDataOutput();
73        }
74      };
75  
76    /**
77     * Constructor
78     *
79     * @param config configuration
80     */
81    public ExtendedByteArrayOutputBuffer(
82      ImmutableClassesGiraphConfiguration<?, ?, ?> config) {
83      this.config = config;
84  
85      maxBufSize = CAPACITY_OF_DATAOUT_IN_BUFFER.get(config);
86      threshold = (int) (FILLING_THRESHOLD_OF_DATAOUT_IN_BUFFER.get(config) *
87        maxBufSize);
88    }
89  
90    /**
91     * Return threadLocal indexAndDataOutput instance
92     *
93     * @return threadLocal indexAndDataOutput instance
94     */
95    public IndexAndDataOut getIndexAndDataOut() {
96      IndexAndDataOut indexAndDataOut = threadLocal.get();
97      if (indexAndDataOut.dataOutput.getPos() >= threshold) {
98        indexAndDataOut = newIndexAndDataOutput();
99        threadLocal.set(indexAndDataOut);
100     }
101     return indexAndDataOut;
102   }
103 
104   /**
105    * Get dataoutput from bytearrayOutputs
106    *
107    * @param index index in bytearrayOutputs
108    * @return extendeddataoutput at given index
109    */
110   public ExtendedDataOutput getDataOutput(int index) {
111     return bytearrayOutputs.get(index);
112   }
113 
114   /**
115    * Holder for index &amp; DataOutput objects
116    */
117   public static class IndexAndDataOut {
118     /** Index */
119     private final int index;
120     /** Dataouput instance */
121     private final ExtendedDataOutput dataOutput;
122 
123     /**
124      * Constructor
125      *
126      * @param index index in bytearrayOutputs
127      * @param dataOutput dataoutput
128      */
129     public IndexAndDataOut(int index, ExtendedDataOutput dataOutput) {
130       this.index = index;
131       this.dataOutput = dataOutput;
132     }
133 
134     public int getIndex() {
135       return index;
136     }
137 
138     public ExtendedDataOutput getDataOutput() {
139       return dataOutput;
140     }
141   }
142 
143   /**
144    * Create a new IndexAndDataOutput instance
145    * @return new IndexAndDataOutput instance
146    */
147   private IndexAndDataOut newIndexAndDataOutput() {
148     int index = mapSize.getAndIncrement();
149     ExtendedDataOutput output = config.createExtendedDataOutput(
150         maxBufSize);
151     synchronized (bytearrayOutputs) {
152       bytearrayOutputs.put(index, output);
153     }
154     return new IndexAndDataOut(index, output);
155   }
156 }