This project has retired. For details please refer to its Attic page.
IdByteArrayMessageStore xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.comm.messages.primitives;
19  
20  import com.google.common.collect.Lists;
21  
22  import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
23  
24  import java.io.DataInput;
25  import java.io.DataOutput;
26  import java.io.IOException;
27  import java.util.Iterator;
28  import java.util.List;
29  
30  import org.apache.giraph.comm.messages.MessageStore;
31  import org.apache.giraph.comm.messages.MessagesIterable;
32  import org.apache.giraph.comm.messages.PartitionSplitInfo;
33  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
34  import org.apache.giraph.factories.MessageValueFactory;
35  import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
36  import org.apache.giraph.types.ops.TypeOpsUtils;
37  import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
38  import org.apache.giraph.types.ops.collections.WritableWriter;
39  import org.apache.giraph.utils.EmptyIterable;
40  import org.apache.giraph.utils.VerboseByteStructMessageWrite;
41  import org.apache.giraph.utils.VertexIdMessageBytesIterator;
42  import org.apache.giraph.utils.VertexIdMessageIterator;
43  import org.apache.giraph.utils.VertexIdMessages;
44  import org.apache.giraph.utils.io.DataInputOutput;
45  import org.apache.hadoop.io.Writable;
46  import org.apache.hadoop.io.WritableComparable;
47  
48  /**
49   * Special message store to be used when IDs are primitive and no combiner is
50   * used.
51   * Data is backed by primitive maps in order to decrease number of objects and
52   * get better performance.
53   *
54   * @param <I> Vertex id type
55   * @param <M> Message type
56   */
57  public class IdByteArrayMessageStore<I extends WritableComparable,
58      M extends Writable> implements MessageStore<I, M> {
59    /** Message value factory */
60    protected final MessageValueFactory<M> messageValueFactory;
61    /** Map from partition id to map from vertex id to message */
62    private final Int2ObjectOpenHashMap<Basic2ObjectMap<I, DataInputOutput>> map;
63    /** Partition split info */
64    private final PartitionSplitInfo<I> partitionInfo;
65    /** Giraph configuration */
66    private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
67    /** Vertex id TypeOps */
68    private final PrimitiveIdTypeOps<I> idTypeOps;
69    /** WritableWriter for values in this message store */
70    private final WritableWriter<DataInputOutput>
71    dataInputOutputWriter = new WritableWriter<DataInputOutput>() {
72      @Override
73      public DataInputOutput readFields(DataInput in) throws IOException {
74        DataInputOutput dataInputOutput = config.createMessagesInputOutput();
75        dataInputOutput.readFields(in);
76        return dataInputOutput;
77      }
78  
79      @Override
80      public void write(DataOutput out, DataInputOutput value)
81        throws IOException {
82        value.write(out);
83      }
84    };
85  
86    /**
87     * Constructor
88     *
89     * @param messageValueFactory Factory for creating message values
90     * @param partitionInfo Partition split info
91     * @param config Hadoop configuration
92     */
93    public IdByteArrayMessageStore(MessageValueFactory<M> messageValueFactory,
94      PartitionSplitInfo<I> partitionInfo,
95      ImmutableClassesGiraphConfiguration<I, ?, ?> config
96    ) {
97      this.messageValueFactory = messageValueFactory;
98      this.partitionInfo = partitionInfo;
99      this.config = config;
100 
101     idTypeOps = TypeOpsUtils.getPrimitiveIdTypeOps(config.getVertexIdClass());
102 
103     map = new Int2ObjectOpenHashMap<Basic2ObjectMap<I, DataInputOutput>>();
104     for (int partitionId : partitionInfo.getPartitionIds()) {
105       int capacity = Math.max(10,
106         (int) partitionInfo.getPartitionVertexCount(partitionId));
107       Basic2ObjectMap<I, DataInputOutput> partitionMap =
108         idTypeOps.create2ObjectOpenHashMap(
109           capacity,
110           dataInputOutputWriter);
111 
112       map.put(partitionId, partitionMap);
113     }
114   }
115 
116   /**
117    * Get map which holds messages for partition which vertex belongs to.
118    *
119    * @param vertexId Id of the vertex
120    * @return Map which holds messages for partition which vertex belongs to.
121    */
122   private Basic2ObjectMap<I, DataInputOutput> getPartitionMap(I vertexId) {
123     return map.get(partitionInfo.getPartitionId(vertexId));
124   }
125 
126   /**
127    * Get the DataInputOutput for a vertex id, creating if necessary.
128    *
129    * @param partitionMap Partition map to look in
130    * @param vertexId Id of the vertex
131    * @return DataInputOutput for this vertex id (created if necessary)
132    */
133   private DataInputOutput getDataInputOutput(
134       Basic2ObjectMap<I, DataInputOutput> partitionMap,
135       I vertexId) {
136     DataInputOutput dataInputOutput = partitionMap.get(vertexId);
137     if (dataInputOutput == null) {
138       dataInputOutput = config.createMessagesInputOutput();
139       partitionMap.put(vertexId, dataInputOutput);
140     }
141     return dataInputOutput;
142   }
143 
144   @Override
145   public void addPartitionMessages(int partitionId,
146       VertexIdMessages<I, M> messages) {
147     Basic2ObjectMap<I, DataInputOutput> partitionMap = map.get(partitionId);
148     synchronized (partitionMap) {
149       VertexIdMessageBytesIterator<I, M> vertexIdMessageBytesIterator =
150           messages.getVertexIdMessageBytesIterator();
151       // Try to copy the message buffer over rather than
152       // doing a deserialization of a message just to know its size. This
153       // should be more efficient for complex objects where serialization is
154       // expensive. If this type of iterator is not available, fall back to
155       // deserializing/serializing the messages
156       if (vertexIdMessageBytesIterator != null) {
157         while (vertexIdMessageBytesIterator.hasNext()) {
158           vertexIdMessageBytesIterator.next();
159           DataInputOutput dataInputOutput = getDataInputOutput(
160               partitionMap, vertexIdMessageBytesIterator.getCurrentVertexId());
161           vertexIdMessageBytesIterator.writeCurrentMessageBytes(
162               dataInputOutput.getDataOutput());
163         }
164       } else {
165         try {
166           VertexIdMessageIterator<I, M> iterator =
167               messages.getVertexIdMessageIterator();
168           while (iterator.hasNext()) {
169             iterator.next();
170             DataInputOutput dataInputOutput =
171                 getDataInputOutput(partitionMap, iterator.getCurrentVertexId());
172 
173             VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
174                 dataInputOutput.getDataOutput());
175           }
176         } catch (IOException e) {
177           throw new RuntimeException("addPartitionMessages: IOException while" +
178               " adding message for a partition: " + e);
179         }
180       }
181     }
182   }
183 
184   /**
185    * Adds a message for a particular vertex
186    *
187    * @param vertexId Id of target vertex
188    * @param message  A message to send
189    * @throws IOException
190    */
191   @Override
192   public void addMessage(I vertexId, M message) throws IOException {
193     Basic2ObjectMap<I, DataInputOutput> partitionMap =
194       getPartitionMap(vertexId);
195     synchronized (partitionMap) {
196       DataInputOutput dataInputOutput = getDataInputOutput(
197         partitionMap, vertexId);
198       VerboseByteStructMessageWrite.verboseWriteCurrentMessage(
199         vertexId, message, dataInputOutput.getDataOutput());
200     }
201   }
202 
203   @Override
204   public void clearPartition(int partitionId) {
205     map.get(partitionId).clear();
206   }
207 
208   @Override
209   public boolean hasMessagesForVertex(I vertexId) {
210     return getPartitionMap(vertexId).containsKey(vertexId);
211   }
212 
213   @Override
214   public boolean hasMessagesForPartition(int partitionId) {
215     Basic2ObjectMap<I, DataInputOutput> partitionMessages =
216         map.get(partitionId);
217     return partitionMessages != null && partitionMessages.size() != 0;
218   }
219 
220   @Override
221   public Iterable<M> getVertexMessages(I vertexId) {
222     DataInputOutput dataInputOutput = getPartitionMap(vertexId).get(vertexId);
223     if (dataInputOutput == null) {
224       return EmptyIterable.get();
225     } else {
226       return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
227     }
228   }
229 
230   @Override
231   public void clearVertexMessages(I vertexId) {
232     getPartitionMap(vertexId).remove(vertexId);
233   }
234 
235   @Override
236   public void clearAll() {
237     map.clear();
238   }
239 
240   @Override
241   public Iterable<I> getPartitionDestinationVertices(int partitionId) {
242     Basic2ObjectMap<I, DataInputOutput> partitionMap = map.get(partitionId);
243     List<I> vertices = Lists.newArrayListWithCapacity(partitionMap.size());
244     Iterator<I> iterator = partitionMap.fastKeyIterator();
245     while (iterator.hasNext()) {
246       vertices.add(idTypeOps.createCopy(iterator.next()));
247     }
248     return vertices;
249   }
250 
251   @Override
252   public void writePartition(DataOutput out, int partitionId)
253     throws IOException {
254     Basic2ObjectMap<I, DataInputOutput> partitionMap = map.get(partitionId);
255     partitionMap.write(out);
256   }
257 
258   @Override
259   public void readFieldsForPartition(DataInput in, int partitionId)
260     throws IOException {
261     Basic2ObjectMap<I, DataInputOutput> partitionMap =
262         idTypeOps.create2ObjectOpenHashMap(dataInputOutputWriter);
263     partitionMap.readFields(in);
264     synchronized (map) {
265       map.put(partitionId, partitionMap);
266     }
267   }
268 
269   @Override
270   public void finalizeStore() {
271   }
272 
273   @Override
274   public boolean isPointerListEncoding() {
275     return false;
276   }
277 }