View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.messages.primitives.long_id;
20  
21  import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
22  import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
23  import org.apache.giraph.bsp.CentralizedServiceWorker;
24  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
25  import org.apache.giraph.factories.MessageValueFactory;
26  import org.apache.giraph.graph.Vertex;
27  import org.apache.giraph.partition.Partition;
28  import org.apache.giraph.partition.PartitionOwner;
29  import org.apache.giraph.utils.VertexIdIterator;
30  import org.apache.hadoop.io.LongWritable;
31  import org.apache.hadoop.io.Writable;
32  
33  import java.util.List;
34  
35  /**
36   * Special message store to be used when ids are LongWritable and no combiner
37   * is used.
38   * Uses fastutil primitive maps in order to decrease number of objects and
39   * get better performance.
40   *
41   * @param <M> message type
42   * @param <L> list type
43   */
44  public abstract class LongAbstractListMessageStore<M extends Writable,
45    L extends List> extends LongAbstractMessageStore<M, L> {
46    /**
47     * Map used to store messages for nascent vertices i.e., ones
48     * that did not exist at the start of current superstep but will get
49     * created because of sending message to a non-existent vertex id
50     */
51    private final
52    Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<L>> nascentMap;
53  
54    /**
55     * Constructor
56     *
57     * @param messageValueFactory Factory for creating message values
58     * @param service             Service worker
59     * @param config              Hadoop configuration
60     */
61    public LongAbstractListMessageStore(
62        MessageValueFactory<M> messageValueFactory,
63        CentralizedServiceWorker<LongWritable, Writable, Writable> service,
64        ImmutableClassesGiraphConfiguration<LongWritable,
65            Writable, Writable> config) {
66      super(messageValueFactory, service, config);
67      populateMap();
68  
69      // create map for vertex ids (i.e., nascent vertices) not known yet
70      nascentMap = new Int2ObjectOpenHashMap<>();
71      for (int partitionId : service.getPartitionStore().getPartitionIds()) {
72        nascentMap.put(partitionId, new Long2ObjectOpenHashMap<L>());
73      }
74    }
75  
76    /**
77     * Populate the map with all vertexIds for each partition
78     */
79    private void populateMap() { // TODO - can parallelize?
80      // populate with vertex ids already known
81      service.getPartitionStore().startIteration();
82      while (true) {
83        Partition partition = service.getPartitionStore().getNextPartition();
84        if (partition == null) {
85          break;
86        }
87        Long2ObjectOpenHashMap<L> partitionMap = map.get(partition.getId());
88        for (Object obj : partition) {
89          Vertex vertex = (Vertex) obj;
90          LongWritable vertexId = (LongWritable) vertex.getId();
91          partitionMap.put(vertexId.get(), createList());
92        }
93        service.getPartitionStore().putPartition(partition);
94      }
95    }
96  
97    /**
98     * Create an instance of L
99     * @return instance of L
100    */
101   protected abstract L createList();
102 
103   /**
104    * Get list for the current vertexId
105    *
106    * @param iterator vertexId iterator
107    * @return list for current vertexId
108    */
109   protected L getList(
110     VertexIdIterator<LongWritable> iterator) {
111     PartitionOwner owner =
112         service.getVertexPartitionOwner(iterator.getCurrentVertexId());
113     long vertexId = iterator.getCurrentVertexId().get();
114     int partitionId = owner.getPartitionId();
115     Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId);
116     if (!partitionMap.containsKey(vertexId)) {
117       synchronized (nascentMap) {
118         // assumption: not many nascent vertices are created
119         // so overall synchronization is negligible
120         Long2ObjectOpenHashMap<L> nascentPartitionMap =
121           nascentMap.get(partitionId);
122         if (nascentPartitionMap.get(vertexId) == null) {
123           nascentPartitionMap.put(vertexId, createList());
124         }
125         return nascentPartitionMap.get(vertexId);
126       }
127     }
128     return partitionMap.get(vertexId);
129   }
130 
131   @Override
132   public void finalizeStore() {
133     for (int partitionId : nascentMap.keySet()) {
134       // nascent vertices are present only in nascent map
135       map.get(partitionId).putAll(nascentMap.get(partitionId));
136     }
137     nascentMap.clear();
138   }
139 
140   // TODO - discussion
141   /*
142   some approaches for ensuring correctness with parallel inserts
143   - current approach: uses a small extra bit of memory by pre-populating
144   map & pushes everything map cannot handle to nascentMap
145   at the beginning of next superstep compute a single threaded finalizeStore is
146   called (so little extra memory + 1 sequential finish ops)
147   - used striped parallel fast utils instead (unsure of perf)
148   - use concurrent map (every get gets far slower)
149   - use reader writer locks (unsure of perf)
150   (code looks something like underneath)
151 
152       private final ReadWriteLock rwl = new ReentrantReadWriteLock();
153       rwl.readLock().lock();
154       L list = partitionMap.get(vertexId);
155       if (list == null) {
156         rwl.readLock().unlock();
157         rwl.writeLock().lock();
158         if (partitionMap.get(vertexId) == null) {
159           list = createList();
160           partitionMap.put(vertexId, list);
161         }
162         rwl.readLock().lock();
163         rwl.writeLock().unlock();
164       }
165       rwl.readLock().unlock();
166   - adopted from the article
167     http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/locks/\
168     ReentrantReadWriteLock.html
169    */
170 }