This project has retired. For details please refer to its Attic page.
LongAbstractListStore xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.messages.primitives.long_id;
20  
21  import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
22  import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
23  
24  import java.util.List;
25  
26  import org.apache.giraph.comm.messages.PartitionSplitInfo;
27  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
28  import org.apache.giraph.factories.MessageValueFactory;
29  import org.apache.giraph.graph.Vertex;
30  import org.apache.giraph.partition.Partition;
31  import org.apache.hadoop.io.LongWritable;
32  import org.apache.hadoop.io.Writable;
33  
34  /**
35   * Special message store to be used when ids are LongWritable and no combiner
36   * is used.
37   * Uses fastutil primitive maps in order to decrease number of objects and
38   * get better performance.
39   *
40   * @param <M> message type
41   * @param <L> list type
42   */
43  public abstract class LongAbstractListStore<M extends Writable,
44    L extends List> extends LongAbstractStore<M, L> {
45    /**
46     * Map used to store messages for nascent vertices i.e., ones
47     * that did not exist at the start of current superstep but will get
48     * created because of sending message to a non-existent vertex id
49     */
50    private final
51    Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<L>> nascentMap;
52  
53    /**
54     * Constructor
55     *
56     * @param messageValueFactory Factory for creating message values
57     * @param partitionInfo       Partition split info
58     * @param config              Hadoop configuration
59     */
60    public LongAbstractListStore(
61        MessageValueFactory<M> messageValueFactory,
62        PartitionSplitInfo<LongWritable> partitionInfo,
63        ImmutableClassesGiraphConfiguration<LongWritable,
64            Writable, Writable> config) {
65      super(messageValueFactory, partitionInfo, config);
66      populateMap();
67  
68      // create map for vertex ids (i.e., nascent vertices) not known yet
69      nascentMap = new Int2ObjectOpenHashMap<>();
70      for (int partitionId : partitionInfo.getPartitionIds()) {
71        nascentMap.put(partitionId, new Long2ObjectOpenHashMap<L>());
72      }
73    }
74  
75    /**
76     * Populate the map with all vertexIds for each partition
77     */
78    private void populateMap() { // TODO - can parallelize?
79      // populate with vertex ids already known
80      partitionInfo.startIteration();
81      while (true) {
82        Partition partition = partitionInfo.getNextPartition();
83        if (partition == null) {
84          break;
85        }
86        Long2ObjectOpenHashMap<L> partitionMap = map.get(partition.getId());
87        for (Object obj : partition) {
88          Vertex vertex = (Vertex) obj;
89          LongWritable vertexId = (LongWritable) vertex.getId();
90          partitionMap.put(vertexId.get(), createList());
91        }
92        partitionInfo.putPartition(partition);
93      }
94    }
95  
96    /**
97     * Create an instance of L
98     * @return instance of L
99     */
100   protected abstract L createList();
101 
102   /**
103    * Get list for the current vertexId
104    *
105    * @param vertexId vertex id
106    * @return list for current vertexId
107    */
108   protected L getList(LongWritable vertexId) {
109     long id = vertexId.get();
110     int partitionId = partitionInfo.getPartitionId(vertexId);
111     Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId);
112     L list = partitionMap.get(id);
113     if (list == null) {
114       Long2ObjectOpenHashMap<L> nascentPartitionMap =
115         nascentMap.get(partitionId);
116       // assumption: not many nascent vertices are created
117       // so overall synchronization is negligible
118       synchronized (nascentPartitionMap) {
119         list = nascentPartitionMap.get(id);
120         if (list == null) {
121           list = createList();
122           nascentPartitionMap.put(id, list);
123         }
124         return list;
125       }
126     }
127     return list;
128   }
129 
130   @Override
131   public void finalizeStore() {
132     for (int partitionId : nascentMap.keySet()) {
133       // nascent vertices are present only in nascent map
134       map.get(partitionId).putAll(nascentMap.get(partitionId));
135     }
136     nascentMap.clear();
137   }
138 
139   @Override
140   public boolean hasMessagesForVertex(LongWritable vertexId) {
141     int partitionId = partitionInfo.getPartitionId(vertexId);
142     Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId);
143     L list = partitionMap.get(vertexId.get());
144     if (list != null && !list.isEmpty()) {
145       return true;
146     }
147     Long2ObjectOpenHashMap<L> nascentMessages = nascentMap.get(partitionId);
148     return nascentMessages != null &&
149            nascentMessages.containsKey(vertexId.get());
150   }
151 
152   // TODO - discussion
153   /*
154   some approaches for ensuring correctness with parallel inserts
155   - current approach: uses a small extra bit of memory by pre-populating
156   map & pushes everything map cannot handle to nascentMap
157   at the beginning of next superstep compute a single threaded finalizeStore is
158   called (so little extra memory + 1 sequential finish ops)
159   - used striped parallel fast utils instead (unsure of perf)
160   - use concurrent map (every get gets far slower)
161   - use reader writer locks (unsure of perf)
162   (code looks something like underneath)
163 
164       private final ReadWriteLock rwl = new ReentrantReadWriteLock();
165       rwl.readLock().lock();
166       L list = partitionMap.get(vertexId);
167       if (list == null) {
168         rwl.readLock().unlock();
169         rwl.writeLock().lock();
170         if (partitionMap.get(vertexId) == null) {
171           list = createList();
172           partitionMap.put(vertexId, list);
173         }
174         rwl.readLock().lock();
175         rwl.writeLock().unlock();
176       }
177       rwl.readLock().unlock();
178   - adopted from the article
179     http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/locks/\
180     ReentrantReadWriteLock.html
181    */
182 }