This project has retired. For details please refer to its Attic page.
IntFloatMessageStore xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.messages.primitives;
20  
21  import com.google.common.collect.Lists;
22  
23  import it.unimi.dsi.fastutil.ints.Int2FloatMap;
24  import it.unimi.dsi.fastutil.ints.Int2FloatOpenHashMap;
25  import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
26  import it.unimi.dsi.fastutil.ints.IntIterator;
27  import it.unimi.dsi.fastutil.objects.ObjectIterator;
28  
29  import java.io.DataInput;
30  import java.io.DataOutput;
31  import java.io.IOException;
32  import java.util.Collections;
33  import java.util.List;
34  
35  import org.apache.giraph.combiner.MessageCombiner;
36  import org.apache.giraph.comm.messages.MessageStore;
37  import org.apache.giraph.comm.messages.PartitionSplitInfo;
38  import org.apache.giraph.utils.EmptyIterable;
39  import org.apache.giraph.utils.VertexIdMessageIterator;
40  import org.apache.giraph.utils.VertexIdMessages;
41  import org.apache.hadoop.io.FloatWritable;
42  import org.apache.hadoop.io.IntWritable;
43  
44  /**
45   * Special message store to be used when ids are IntWritable and messages
46   * are FloatWritable and messageCombiner is used.
47   * Uses fastutil primitive maps in order to decrease number of objects and
48   * get better performance.
49   */
50  public class IntFloatMessageStore
51      implements MessageStore<IntWritable, FloatWritable> {
52    /** Map from partition id to map from vertex id to message */
53    private final Int2ObjectOpenHashMap<Int2FloatOpenHashMap> map;
54    /** Message messageCombiner */
55    private final
56    MessageCombiner<? super IntWritable, FloatWritable> messageCombiner;
57    /** Partition split info */
58    private final PartitionSplitInfo<IntWritable> partitionInfo;
59  
60    /**
61     * Constructor
62     *
63     * @param partitionInfo Partition split info
64     * @param messageCombiner Message messageCombiner
65     */
66    public IntFloatMessageStore(
67      PartitionSplitInfo<IntWritable> partitionInfo,
68      MessageCombiner<? super IntWritable, FloatWritable> messageCombiner
69    ) {
70      this.partitionInfo = partitionInfo;
71      this.messageCombiner = messageCombiner;
72  
73      map = new Int2ObjectOpenHashMap<Int2FloatOpenHashMap>();
74      for (int partitionId : partitionInfo.getPartitionIds()) {
75        Int2FloatOpenHashMap partitionMap = new Int2FloatOpenHashMap(
76            (int) partitionInfo.getPartitionVertexCount(partitionId));
77        map.put(partitionId, partitionMap);
78      }
79    }
80  
81    @Override
82    public boolean isPointerListEncoding() {
83      return false;
84    }
85  
86    /**
87     * Get map which holds messages for partition which vertex belongs to.
88     *
89     * @param vertexId Id of the vertex
90     * @return Map which holds messages for partition which vertex belongs to.
91     */
92    private Int2FloatOpenHashMap getPartitionMap(IntWritable vertexId) {
93      return map.get(partitionInfo.getPartitionId(vertexId));
94    }
95  
96    @Override
97    public void addPartitionMessages(int partitionId,
98        VertexIdMessages<IntWritable, FloatWritable> messages) {
99      IntWritable reusableVertexId = new IntWritable();
100     FloatWritable reusableMessage = new FloatWritable();
101     FloatWritable reusableCurrentMessage = new FloatWritable();
102 
103     Int2FloatOpenHashMap partitionMap = map.get(partitionId);
104     synchronized (partitionMap) {
105       VertexIdMessageIterator<IntWritable, FloatWritable>
106           iterator = messages.getVertexIdMessageIterator();
107       while (iterator.hasNext()) {
108         iterator.next();
109         int vertexId = iterator.getCurrentVertexId().get();
110         float message = iterator.getCurrentMessage().get();
111         if (partitionMap.containsKey(vertexId)) {
112           reusableVertexId.set(vertexId);
113           reusableMessage.set(message);
114           reusableCurrentMessage.set(partitionMap.get(vertexId));
115           messageCombiner.combine(reusableVertexId, reusableCurrentMessage,
116               reusableMessage);
117           message = reusableCurrentMessage.get();
118         }
119         // FIXME: messageCombiner should create an initial message instead
120         partitionMap.put(vertexId, message);
121       }
122     }
123   }
124 
125   @Override
126   public void addMessage(
127     IntWritable vertexId,
128     FloatWritable message
129   ) throws IOException {
130     Int2FloatOpenHashMap partitionMap = getPartitionMap(vertexId);
131     synchronized (partitionMap) {
132       float originalValue = partitionMap.get(vertexId.get());
133       FloatWritable originalMessage = new FloatWritable(originalValue);
134       messageCombiner.combine(vertexId, originalMessage, message);
135       partitionMap.put(vertexId.get(), originalMessage.get());
136     }
137   }
138 
139   @Override
140   public void finalizeStore() {
141   }
142 
143   @Override
144   public void clearPartition(int partitionId) {
145     map.get(partitionId).clear();
146   }
147 
148   @Override
149   public boolean hasMessagesForVertex(IntWritable vertexId) {
150     return getPartitionMap(vertexId).containsKey(vertexId.get());
151   }
152 
153   @Override
154   public boolean hasMessagesForPartition(int partitionId) {
155     Int2FloatOpenHashMap partitionMessages = map.get(partitionId);
156     return partitionMessages != null && !partitionMessages.isEmpty();
157   }
158 
159   @Override
160   public Iterable<FloatWritable> getVertexMessages(
161       IntWritable vertexId) {
162     Int2FloatOpenHashMap partitionMap = getPartitionMap(vertexId);
163     if (!partitionMap.containsKey(vertexId.get())) {
164       return EmptyIterable.get();
165     } else {
166       return Collections.singleton(
167           new FloatWritable(partitionMap.get(vertexId.get())));
168     }
169   }
170 
171   @Override
172   public void clearVertexMessages(IntWritable vertexId) {
173     getPartitionMap(vertexId).remove(vertexId.get());
174   }
175 
176   @Override
177   public void clearAll() {
178     map.clear();
179   }
180 
181   @Override
182   public Iterable<IntWritable> getPartitionDestinationVertices(
183       int partitionId) {
184     Int2FloatOpenHashMap partitionMap = map.get(partitionId);
185     List<IntWritable> vertices =
186         Lists.newArrayListWithCapacity(partitionMap.size());
187     IntIterator iterator = partitionMap.keySet().iterator();
188     while (iterator.hasNext()) {
189       vertices.add(new IntWritable(iterator.nextInt()));
190     }
191     return vertices;
192   }
193 
194   @Override
195   public void writePartition(DataOutput out,
196       int partitionId) throws IOException {
197     Int2FloatOpenHashMap partitionMap = map.get(partitionId);
198     out.writeInt(partitionMap.size());
199     ObjectIterator<Int2FloatMap.Entry> iterator =
200         partitionMap.int2FloatEntrySet().fastIterator();
201     while (iterator.hasNext()) {
202       Int2FloatMap.Entry entry = iterator.next();
203       out.writeInt(entry.getIntKey());
204       out.writeFloat(entry.getFloatValue());
205     }
206   }
207 
208   @Override
209   public void readFieldsForPartition(DataInput in,
210       int partitionId) throws IOException {
211     int size = in.readInt();
212     Int2FloatOpenHashMap partitionMap = new Int2FloatOpenHashMap(size);
213     while (size-- > 0) {
214       int vertexId = in.readInt();
215       float message = in.readFloat();
216       partitionMap.put(vertexId, message);
217     }
218     synchronized (map) {
219       map.put(partitionId, partitionMap);
220     }
221   }
222 }