1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.giraph.comm.messages.primitives.long_id;
20
21 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
22 import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
23
24 import java.util.List;
25
26 import org.apache.giraph.comm.messages.PartitionSplitInfo;
27 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
28 import org.apache.giraph.factories.MessageValueFactory;
29 import org.apache.giraph.graph.Vertex;
30 import org.apache.giraph.partition.Partition;
31 import org.apache.hadoop.io.LongWritable;
32 import org.apache.hadoop.io.Writable;
33
34 /**
35 * Special message store to be used when ids are LongWritable and no combiner
36 * is used.
37 * Uses fastutil primitive maps in order to decrease number of objects and
38 * get better performance.
39 *
40 * @param <M> message type
41 * @param <L> list type
42 */
43 public abstract class LongAbstractListStore<M extends Writable,
44 L extends List> extends LongAbstractStore<M, L> {
45 /**
46 * Map used to store messages for nascent vertices i.e., ones
47 * that did not exist at the start of current superstep but will get
48 * created because of sending message to a non-existent vertex id
49 */
50 private final
51 Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<L>> nascentMap;
52
53 /**
54 * Constructor
55 *
56 * @param messageValueFactory Factory for creating message values
57 * @param partitionInfo Partition split info
58 * @param config Hadoop configuration
59 */
60 public LongAbstractListStore(
61 MessageValueFactory<M> messageValueFactory,
62 PartitionSplitInfo<LongWritable> partitionInfo,
63 ImmutableClassesGiraphConfiguration<LongWritable,
64 Writable, Writable> config) {
65 super(messageValueFactory, partitionInfo, config);
66 populateMap();
67
68 // create map for vertex ids (i.e., nascent vertices) not known yet
69 nascentMap = new Int2ObjectOpenHashMap<>();
70 for (int partitionId : partitionInfo.getPartitionIds()) {
71 nascentMap.put(partitionId, new Long2ObjectOpenHashMap<L>());
72 }
73 }
74
75 /**
76 * Populate the map with all vertexIds for each partition
77 */
78 private void populateMap() { // TODO - can parallelize?
79 // populate with vertex ids already known
80 partitionInfo.startIteration();
81 while (true) {
82 Partition partition = partitionInfo.getNextPartition();
83 if (partition == null) {
84 break;
85 }
86 Long2ObjectOpenHashMap<L> partitionMap = map.get(partition.getId());
87 for (Object obj : partition) {
88 Vertex vertex = (Vertex) obj;
89 LongWritable vertexId = (LongWritable) vertex.getId();
90 partitionMap.put(vertexId.get(), createList());
91 }
92 partitionInfo.putPartition(partition);
93 }
94 }
95
96 /**
97 * Create an instance of L
98 * @return instance of L
99 */
100 protected abstract L createList();
101
102 /**
103 * Get list for the current vertexId
104 *
105 * @param vertexId vertex id
106 * @return list for current vertexId
107 */
108 protected L getList(LongWritable vertexId) {
109 long id = vertexId.get();
110 int partitionId = partitionInfo.getPartitionId(vertexId);
111 Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId);
112 L list = partitionMap.get(id);
113 if (list == null) {
114 Long2ObjectOpenHashMap<L> nascentPartitionMap =
115 nascentMap.get(partitionId);
116 // assumption: not many nascent vertices are created
117 // so overall synchronization is negligible
118 synchronized (nascentPartitionMap) {
119 list = nascentPartitionMap.get(id);
120 if (list == null) {
121 list = createList();
122 nascentPartitionMap.put(id, list);
123 }
124 return list;
125 }
126 }
127 return list;
128 }
129
130 @Override
131 public void finalizeStore() {
132 for (int partitionId : nascentMap.keySet()) {
133 // nascent vertices are present only in nascent map
134 map.get(partitionId).putAll(nascentMap.get(partitionId));
135 }
136 nascentMap.clear();
137 }
138
139 @Override
140 public boolean hasMessagesForVertex(LongWritable vertexId) {
141 int partitionId = partitionInfo.getPartitionId(vertexId);
142 Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId);
143 L list = partitionMap.get(vertexId.get());
144 if (list != null && !list.isEmpty()) {
145 return true;
146 }
147 Long2ObjectOpenHashMap<L> nascentMessages = nascentMap.get(partitionId);
148 return nascentMessages != null &&
149 nascentMessages.containsKey(vertexId.get());
150 }
151
152 // TODO - discussion
153 /*
154 some approaches for ensuring correctness with parallel inserts
155 - current approach: uses a small extra bit of memory by pre-populating
156 map & pushes everything map cannot handle to nascentMap
157 at the beginning of next superstep compute a single threaded finalizeStore is
158 called (so little extra memory + 1 sequential finish ops)
159 - used striped parallel fast utils instead (unsure of perf)
160 - use concurrent map (every get gets far slower)
161 - use reader writer locks (unsure of perf)
162 (code looks something like underneath)
163
164 private final ReadWriteLock rwl = new ReentrantReadWriteLock();
165 rwl.readLock().lock();
166 L list = partitionMap.get(vertexId);
167 if (list == null) {
168 rwl.readLock().unlock();
169 rwl.writeLock().lock();
170 if (partitionMap.get(vertexId) == null) {
171 list = createList();
172 partitionMap.put(vertexId, list);
173 }
174 rwl.readLock().lock();
175 rwl.writeLock().unlock();
176 }
177 rwl.readLock().unlock();
178 - adopted from the article
179 http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/locks/\
180 ReentrantReadWriteLock.html
181 */
182 }