This project has retired. For details please refer to its Attic page.
LongDoubleMessageStore xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.messages.primitives;
20  
21  import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
22  import it.unimi.dsi.fastutil.longs.Long2DoubleMap;
23  import it.unimi.dsi.fastutil.longs.Long2DoubleOpenHashMap;
24  import it.unimi.dsi.fastutil.longs.LongIterator;
25  import it.unimi.dsi.fastutil.objects.ObjectIterator;
26  
27  import java.io.DataInput;
28  import java.io.DataOutput;
29  import java.io.IOException;
30  import java.util.Collections;
31  import java.util.List;
32  
33  import org.apache.giraph.combiner.MessageCombiner;
34  import org.apache.giraph.comm.messages.MessageStore;
35  import org.apache.giraph.comm.messages.PartitionSplitInfo;
36  import org.apache.giraph.utils.EmptyIterable;
37  import org.apache.giraph.utils.VertexIdMessageIterator;
38  import org.apache.giraph.utils.VertexIdMessages;
39  import org.apache.hadoop.io.DoubleWritable;
40  import org.apache.hadoop.io.LongWritable;
41  
42  import com.google.common.collect.Lists;
43  
44  /**
45   * Special message store to be used when ids are LongWritable and messages
46   * are DoubleWritable and messageCombiner is used.
47   * Uses fastutil primitive maps in order to decrease number of objects and
48   * get better performance.
49   */
50  public class LongDoubleMessageStore
51      implements MessageStore<LongWritable, DoubleWritable> {
52    /** Map from partition id to map from vertex id to message */
53    private final Int2ObjectOpenHashMap<Long2DoubleOpenHashMap> map;
54    /** Message messageCombiner */
55    private final
56    MessageCombiner<? super LongWritable, DoubleWritable> messageCombiner;
57    /** Service worker */
58    private final PartitionSplitInfo<LongWritable> partitionInfo;
59  
60    /**
61     * Constructor
62     *
63     * @param partitionInfo Partition split info
64     * @param messageCombiner Message messageCombiner
65     */
66    public LongDoubleMessageStore(
67      PartitionSplitInfo<LongWritable> partitionInfo,
68      MessageCombiner<? super LongWritable, DoubleWritable> messageCombiner
69    ) {
70      this.partitionInfo = partitionInfo;
71      this.messageCombiner = messageCombiner;
72  
73      map = new Int2ObjectOpenHashMap<Long2DoubleOpenHashMap>();
74      for (int partitionId : partitionInfo.getPartitionIds()) {
75        Long2DoubleOpenHashMap partitionMap = new Long2DoubleOpenHashMap(
76            (int) partitionInfo.getPartitionVertexCount(partitionId));
77        map.put(partitionId, partitionMap);
78      }
79    }
80  
81    @Override
82    public boolean isPointerListEncoding() {
83      return false;
84    }
85  
86    /**
87     * Get map which holds messages for partition which vertex belongs to.
88     *
89     * @param vertexId Id of the vertex
90     * @return Map which holds messages for partition which vertex belongs to.
91     */
92    private Long2DoubleOpenHashMap getPartitionMap(LongWritable vertexId) {
93      return map.get(partitionInfo.getPartitionId(vertexId));
94    }
95  
96    @Override
97    public void addPartitionMessages(int partitionId,
98        VertexIdMessages<LongWritable, DoubleWritable> messages) {
99      LongWritable reusableVertexId = new LongWritable();
100     DoubleWritable reusableMessage = new DoubleWritable();
101     DoubleWritable reusableCurrentMessage = new DoubleWritable();
102 
103     Long2DoubleOpenHashMap partitionMap = map.get(partitionId);
104     synchronized (partitionMap) {
105       VertexIdMessageIterator<LongWritable, DoubleWritable> iterator =
106         messages.getVertexIdMessageIterator();
107       while (iterator.hasNext()) {
108         iterator.next();
109         long vertexId = iterator.getCurrentVertexId().get();
110         double message = iterator.getCurrentMessage().get();
111         if (partitionMap.containsKey(vertexId)) {
112           reusableVertexId.set(vertexId);
113           reusableMessage.set(message);
114           reusableCurrentMessage.set(partitionMap.get(vertexId));
115           messageCombiner.combine(reusableVertexId, reusableCurrentMessage,
116               reusableMessage);
117           message = reusableCurrentMessage.get();
118         }
119         // FIXME: messageCombiner should create an initial message instead
120         partitionMap.put(vertexId, message);
121       }
122     }
123   }
124 
125   @Override
126   public void addMessage(
127     LongWritable vertexId,
128     DoubleWritable message
129   ) throws IOException {
130     Long2DoubleOpenHashMap partitionMap = getPartitionMap(vertexId);
131     synchronized (partitionMap) {
132       double originalValue = partitionMap.get(vertexId.get());
133       DoubleWritable originalMessage = new DoubleWritable(originalValue);
134       messageCombiner.combine(vertexId, originalMessage, message);
135       partitionMap.put(vertexId.get(), originalMessage.get());
136     }
137   }
138 
139   @Override
140   public void finalizeStore() {
141   }
142 
143   @Override
144   public void clearPartition(int partitionId) {
145     map.get(partitionId).clear();
146   }
147 
148   @Override
149   public boolean hasMessagesForVertex(LongWritable vertexId) {
150     return getPartitionMap(vertexId).containsKey(vertexId.get());
151   }
152 
153   @Override
154   public boolean hasMessagesForPartition(int partitionId) {
155     Long2DoubleOpenHashMap partitionMessages = map.get(partitionId);
156     return partitionMessages != null && !partitionMessages.isEmpty();
157   }
158 
159   @Override
160   public Iterable<DoubleWritable> getVertexMessages(
161       LongWritable vertexId) {
162     Long2DoubleOpenHashMap partitionMap = getPartitionMap(vertexId);
163     if (!partitionMap.containsKey(vertexId.get())) {
164       return EmptyIterable.get();
165     } else {
166       return Collections.singleton(
167           new DoubleWritable(partitionMap.get(vertexId.get())));
168     }
169   }
170 
171   @Override
172   public void clearVertexMessages(LongWritable vertexId) {
173     getPartitionMap(vertexId).remove(vertexId.get());
174   }
175 
176   @Override
177   public void clearAll() {
178     map.clear();
179   }
180 
181   @Override
182   public Iterable<LongWritable> getPartitionDestinationVertices(
183       int partitionId) {
184     Long2DoubleOpenHashMap partitionMap = map.get(partitionId);
185     List<LongWritable> vertices =
186         Lists.newArrayListWithCapacity(partitionMap.size());
187     LongIterator iterator = partitionMap.keySet().iterator();
188     while (iterator.hasNext()) {
189       vertices.add(new LongWritable(iterator.nextLong()));
190     }
191     return vertices;
192   }
193 
194   @Override
195   public void writePartition(DataOutput out,
196       int partitionId) throws IOException {
197     Long2DoubleOpenHashMap partitionMap = map.get(partitionId);
198     out.writeInt(partitionMap.size());
199     ObjectIterator<Long2DoubleMap.Entry> iterator =
200         partitionMap.long2DoubleEntrySet().fastIterator();
201     while (iterator.hasNext()) {
202       Long2DoubleMap.Entry entry = iterator.next();
203       out.writeLong(entry.getLongKey());
204       out.writeDouble(entry.getDoubleValue());
205     }
206   }
207 
208   @Override
209   public void readFieldsForPartition(DataInput in,
210       int partitionId) throws IOException {
211     int size = in.readInt();
212     Long2DoubleOpenHashMap partitionMap = new Long2DoubleOpenHashMap(size);
213     while (size-- > 0) {
214       long vertexId = in.readLong();
215       double message = in.readDouble();
216       partitionMap.put(vertexId, message);
217     }
218     synchronized (map) {
219       map.put(partitionId, partitionMap);
220     }
221   }
222 }