This project has retired. For details please refer to its Attic page.
LongByteMappingStore xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.mapping;
20  
21  import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
22  
23  import java.util.Arrays;
24  import java.util.Map;
25  import java.util.concurrent.ConcurrentMap;
26  import java.util.concurrent.atomic.AtomicLong;
27  
28  import javax.annotation.concurrent.ThreadSafe;
29  
30  import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
31  import org.apache.giraph.conf.GiraphConstants;
32  import org.apache.hadoop.io.ByteWritable;
33  import org.apache.hadoop.io.LongWritable;
34  import org.apache.hadoop.io.Writable;
35  import org.apache.log4j.Logger;
36  
37  import com.google.common.collect.MapMaker;
38  
39  /**
40   *
41   * An implementation of MappingStore<LongWritable, ByteWritable>
42   *
43   * Methods implemented here are thread safe by default because it is guaranteed
44   * that each entry is written to only once.
45   * It can represent up to a maximum of 254 workers
46   * any byte passed is treated as unsigned
47   */
48  @ThreadSafe
49  public class LongByteMappingStore
50    extends DefaultImmutableClassesGiraphConfigurable<LongWritable, Writable,
51    Writable> implements MappingStore<LongWritable, ByteWritable> {
52    /** Logger instance */
53    private static final Logger LOG = Logger.getLogger(
54      LongByteMappingStore.class);
55  
56    /** Counts number of entries added */
57    private final AtomicLong numEntries = new AtomicLong(0);
58  
59    /** Id prefix to bytesArray index mapping */
60    private ConcurrentMap<Long, byte[]> concurrentIdToBytes;
61    /** Primitive idToBytes for faster querying */
62    private Long2ObjectOpenHashMap<byte[]> idToBytes;
63    /** Number of lower order bits */
64    private int lower;
65    /** Number of distinct prefixes */
66    private int upper;
67    /** Bit mask for lowerOrder suffix bits */
68    private int lowerBitMask;
69    /** LowerOrder bits count */
70    private int lowerOrder;
71  
72    @Override
73    public void initialize() {
74      upper = GiraphConstants.LB_MAPPINGSTORE_UPPER.get(getConf());
75      lower = GiraphConstants.LB_MAPPINGSTORE_LOWER.get(getConf());
76  
77      if ((lower & (lower - 1)) != 0) {
78        throw new IllegalStateException("lower not a power of two");
79      }
80  
81      lowerBitMask = lower - 1;
82      lowerOrder = Integer.numberOfTrailingZeros(lower); // log_2_(lower)
83      concurrentIdToBytes = new MapMaker()
84          .initialCapacity(upper)
85          .concurrencyLevel(getConf().getNumInputSplitsThreads())
86          .makeMap();
87      idToBytes = new Long2ObjectOpenHashMap<>(upper);
88    }
89  
90    /**
91     * Auxiliary method to be used by getTarget
92     *
93     * @param vertexId vertexId
94     * @return return byte value of target
95     */
96    public byte getByteTarget(LongWritable vertexId) {
97      long key = vertexId.get() >>> lowerOrder;
98      int suffix = (int) (vertexId.get() & lowerBitMask);
99      if (!idToBytes.containsKey(key)) {
100       return -1;
101     }
102     return idToBytes.get(key)[suffix];
103   }
104 
105   @Override
106   public void addEntry(LongWritable vertexId, ByteWritable target) {
107     long key = vertexId.get() >>> lowerOrder;
108     byte[] bytes = concurrentIdToBytes.get(key);
109     if (bytes == null) {
110       byte[] newBytes = new byte[lower];
111       Arrays.fill(newBytes, (byte) -1);
112       bytes = concurrentIdToBytes.putIfAbsent(key, newBytes);
113       if (bytes == null) {
114         bytes = newBytes;
115       }
116     }
117     bytes[(int) (vertexId.get() & lowerBitMask)] = target.get();
118     numEntries.getAndIncrement(); // increment count
119   }
120 
121   @Override
122   public ByteWritable getTarget(LongWritable vertexId,
123     ByteWritable target) {
124     byte bval = getByteTarget(vertexId);
125     if (bval == -1) { // worker not assigned by mapping
126       return null;
127     }
128     target.set(bval);
129     return target;
130   }
131 
132   @Override
133   public void postFilling() {
134     // not thread-safe
135     for (Map.Entry<Long, byte[]> entry : concurrentIdToBytes.entrySet()) {
136       idToBytes.put(entry.getKey(), entry.getValue());
137     }
138     concurrentIdToBytes.clear();
139     concurrentIdToBytes = null;
140   }
141 
142   @Override
143   public long getStats() {
144     return numEntries.longValue();
145   }
146 }