View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.mapping;
20  
21  import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
22  
23  import java.util.Arrays;
24  import java.util.concurrent.ConcurrentMap;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  import javax.annotation.concurrent.ThreadSafe;
28  
29  import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
30  import org.apache.giraph.conf.GiraphConstants;
31  import org.apache.hadoop.io.ByteWritable;
32  import org.apache.hadoop.io.LongWritable;
33  import org.apache.hadoop.io.Writable;
34  import org.apache.log4j.Logger;
35  
36  import com.google.common.collect.MapMaker;
37  
38  /**
39   *
40   * An implementation of MappingStore<LongWritable, ByteWritable>
41   *
42   * Methods implemented here are thread safe by default because it is guaranteed
43   * that each entry is written to only once.
44   * It can represent up to a maximum of 254 workers
45   * any byte passed is treated as unsigned
46   */
47  @ThreadSafe
48  public class LongByteMappingStore
49    extends DefaultImmutableClassesGiraphConfigurable<LongWritable, Writable,
50    Writable> implements MappingStore<LongWritable, ByteWritable> {
51    /** Logger instance */
52    private static final Logger LOG = Logger.getLogger(
53      LongByteMappingStore.class);
54  
55    /** Counts number of entries added */
56    private final AtomicLong numEntries = new AtomicLong(0);
57  
58    /** Id prefix to bytesArray index mapping */
59    private ConcurrentMap<Long, byte[]> concurrentIdToBytes;
60    /** Primitive idToBytes for faster querying */
61    private Long2ObjectOpenHashMap<byte[]> idToBytes;
62    /** Number of lower order bits */
63    private int lower;
64    /** Number of distinct prefixes */
65    private int upper;
66    /** Bit mask for lowerOrder suffix bits */
67    private int lowerBitMask;
68    /** LowerOrder bits count */
69    private int lowerOrder;
70  
71    @Override
72    public void initialize() {
73      upper = GiraphConstants.LB_MAPPINGSTORE_UPPER.get(getConf());
74      lower = GiraphConstants.LB_MAPPINGSTORE_LOWER.get(getConf());
75  
76      if ((lower & (lower - 1)) != 0) {
77        throw new IllegalStateException("lower not a power of two");
78      }
79  
80      lowerBitMask = lower - 1;
81      lowerOrder = Integer.numberOfTrailingZeros(lower); // log_2_(lower)
82      concurrentIdToBytes = new MapMaker()
83          .initialCapacity(upper)
84          .concurrencyLevel(getConf().getNumInputSplitsThreads())
85          .makeMap();
86      idToBytes = new Long2ObjectOpenHashMap<>(upper);
87    }
88  
89    /**
90     * Auxiliary method to be used by getTarget
91     *
92     * @param vertexId vertexId
93     * @return return byte value of target
94     */
95    public byte getByteTarget(LongWritable vertexId) {
96      long key = vertexId.get() >>> lowerOrder;
97      int suffix = (int) (vertexId.get() & lowerBitMask);
98      if (!idToBytes.containsKey(key)) {
99        return -1;
100     }
101     return idToBytes.get(key)[suffix];
102   }
103 
104   @Override
105   public void addEntry(LongWritable vertexId, ByteWritable target) {
106     long key = vertexId.get() >>> lowerOrder;
107     byte[] bytes = concurrentIdToBytes.get(key);
108     if (bytes == null) {
109       byte[] newBytes = new byte[lower];
110       Arrays.fill(newBytes, (byte) -1);
111       bytes = concurrentIdToBytes.putIfAbsent(key, newBytes);
112       if (bytes == null) {
113         bytes = newBytes;
114       }
115     }
116     bytes[(int) (vertexId.get() & lowerBitMask)] = target.get();
117     numEntries.getAndIncrement(); // increment count
118   }
119 
120   @Override
121   public ByteWritable getTarget(LongWritable vertexId,
122     ByteWritable target) {
123     byte bval = getByteTarget(vertexId);
124     if (bval == -1) { // worker not assigned by mapping
125       return null;
126     }
127     target.set(bval);
128     return target;
129   }
130 
131   @Override
132   public void postFilling() {
133     // not thread-safe
134     for (Long id : concurrentIdToBytes.keySet()) {
135       idToBytes.put(id, concurrentIdToBytes.get(id));
136     }
137     concurrentIdToBytes.clear();
138     concurrentIdToBytes = null;
139   }
140 
141   @Override
142   public long getStats() {
143     return numEntries.longValue();
144   }
145 }