View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.messages.primitives;
20  
21  import org.apache.giraph.bsp.CentralizedServiceWorker;
22  import org.apache.giraph.comm.messages.MessageStore;
23  import org.apache.giraph.comm.messages.MessagesIterable;
24  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
25  import org.apache.giraph.factories.MessageValueFactory;
26  import org.apache.giraph.utils.VertexIdMessageBytesIterator;
27  import org.apache.giraph.utils.VertexIdMessageIterator;
28  import org.apache.giraph.utils.VertexIdMessages;
29  import org.apache.giraph.utils.EmptyIterable;
30  import org.apache.giraph.utils.VerboseByteStructMessageWrite;
31  import org.apache.giraph.utils.io.DataInputOutput;
32  import org.apache.hadoop.io.IntWritable;
33  import org.apache.hadoop.io.Writable;
34  
35  import com.google.common.collect.Lists;
36  
37  import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
38  import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
39  import it.unimi.dsi.fastutil.ints.IntIterator;
40  import it.unimi.dsi.fastutil.objects.ObjectIterator;
41  
42  import java.io.DataInput;
43  import java.io.DataOutput;
44  import java.io.IOException;
45  import java.util.List;
46  
47  /**
48   * Special message store to be used when ids are IntWritable and no combiner
49   * is used.
50   * Uses fastutil primitive maps in order to decrease number of objects and
51   * get better performance.
52   *
53   * @param <M> Message type
54   */
55  public class IntByteArrayMessageStore<M extends Writable>
56      implements MessageStore<IntWritable, M> {
57    /** Message value factory */
58    protected final MessageValueFactory<M> messageValueFactory;
59    /** Map from partition id to map from vertex id to message */
60    private final
61    Int2ObjectOpenHashMap<Int2ObjectOpenHashMap<DataInputOutput>> map;
62    /** Service worker */
63    private final CentralizedServiceWorker<IntWritable, ?, ?> service;
64    /** Giraph configuration */
65    private final ImmutableClassesGiraphConfiguration<IntWritable, ?, ?> config;
66  
67    /**
68     * Constructor
69     *
70     * @param messageValueFactory Factory for creating message values
71     * @param service      Service worker
72     * @param config       Hadoop configuration
73     */
74    public IntByteArrayMessageStore(
75        MessageValueFactory<M> messageValueFactory,
76        CentralizedServiceWorker<IntWritable, Writable, Writable> service,
77        ImmutableClassesGiraphConfiguration<IntWritable, Writable, Writable>
78          config) {
79      this.messageValueFactory = messageValueFactory;
80      this.service = service;
81      this.config = config;
82  
83      map =
84          new Int2ObjectOpenHashMap<Int2ObjectOpenHashMap<DataInputOutput>>();
85      for (int partitionId : service.getPartitionStore().getPartitionIds()) {
86        Int2ObjectOpenHashMap<DataInputOutput> partitionMap =
87            new Int2ObjectOpenHashMap<DataInputOutput>(
88                (int) service.getPartitionStore()
89                    .getPartitionVertexCount(partitionId));
90        map.put(partitionId, partitionMap);
91      }
92    }
93  
94    @Override
95    public boolean isPointerListEncoding() {
96      return false;
97    }
98  
99    /**
100    * Get map which holds messages for partition which vertex belongs to.
101    *
102    * @param vertexId Id of the vertex
103    * @return Map which holds messages for partition which vertex belongs to.
104    */
105   private Int2ObjectOpenHashMap<DataInputOutput> getPartitionMap(
106       IntWritable vertexId) {
107     return map.get(service.getPartitionId(vertexId));
108   }
109 
110   /**
111    * Get the DataInputOutput for a vertex id, creating if necessary.
112    *
113    * @param partitionMap Partition map to look in
114    * @param vertexId     Id of the vertex
115    * @return DataInputOutput for this vertex id (created if necessary)
116    */
117   private DataInputOutput getDataInputOutput(
118       Int2ObjectOpenHashMap<DataInputOutput> partitionMap,
119       int vertexId) {
120     DataInputOutput dataInputOutput = partitionMap.get(vertexId);
121     if (dataInputOutput == null) {
122       dataInputOutput = config.createMessagesInputOutput();
123       partitionMap.put(vertexId, dataInputOutput);
124     }
125     return dataInputOutput;
126   }
127 
128   @Override
129   public void addPartitionMessages(int partitionId,
130       VertexIdMessages<IntWritable, M> messages) {
131     Int2ObjectOpenHashMap<DataInputOutput> partitionMap =
132         map.get(partitionId);
133     synchronized (partitionMap) {
134       VertexIdMessageBytesIterator<IntWritable, M>
135           vertexIdMessageBytesIterator =
136           messages.getVertexIdMessageBytesIterator();
137       // Try to copy the message buffer over rather than
138       // doing a deserialization of a message just to know its size.  This
139       // should be more efficient for complex objects where serialization is
140       // expensive.  If this type of iterator is not available, fall back to
141       // deserializing/serializing the messages
142       if (vertexIdMessageBytesIterator != null) {
143         while (vertexIdMessageBytesIterator.hasNext()) {
144           vertexIdMessageBytesIterator.next();
145           DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
146               vertexIdMessageBytesIterator.getCurrentVertexId().get());
147           vertexIdMessageBytesIterator.writeCurrentMessageBytes(
148               dataInputOutput.getDataOutput());
149         }
150       } else {
151         try {
152           VertexIdMessageIterator<IntWritable, M>
153               iterator = messages.getVertexIdMessageIterator();
154           while (iterator.hasNext()) {
155             iterator.next();
156             DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
157                 iterator.getCurrentVertexId().get());
158             VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
159                 dataInputOutput.getDataOutput());
160           }
161         } catch (IOException e) {
162           throw new RuntimeException("addPartitionMessages: IOException while" +
163               " adding messages for a partition: " + e);
164         }
165       }
166     }
167   }
168 
169   @Override
170   public void finalizeStore() {
171   }
172 
173   @Override
174   public void clearPartition(int partitionId) {
175     map.get(partitionId).clear();
176   }
177 
178   @Override
179   public boolean hasMessagesForVertex(IntWritable vertexId) {
180     return getPartitionMap(vertexId).containsKey(vertexId.get());
181   }
182 
183   @Override
184   public boolean hasMessagesForPartition(int partitionId) {
185     Int2ObjectOpenHashMap<DataInputOutput> partitionMessages =
186         map.get(partitionId);
187     return partitionMessages != null && !partitionMessages.isEmpty();
188   }
189 
190   @Override
191   public Iterable<M> getVertexMessages(
192       IntWritable vertexId) {
193     DataInputOutput dataInputOutput =
194         getPartitionMap(vertexId).get(vertexId.get());
195     if (dataInputOutput == null) {
196       return EmptyIterable.get();
197     } else {
198       return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
199     }
200   }
201 
202   @Override
203   public void clearVertexMessages(IntWritable vertexId) {
204     getPartitionMap(vertexId).remove(vertexId.get());
205   }
206 
207   @Override
208   public void clearAll() {
209     map.clear();
210   }
211 
212   @Override
213   public Iterable<IntWritable> getPartitionDestinationVertices(
214       int partitionId) {
215     Int2ObjectOpenHashMap<DataInputOutput> partitionMap =
216         map.get(partitionId);
217     List<IntWritable> vertices =
218         Lists.newArrayListWithCapacity(partitionMap.size());
219     IntIterator iterator = partitionMap.keySet().iterator();
220     while (iterator.hasNext()) {
221       vertices.add(new IntWritable(iterator.nextInt()));
222     }
223     return vertices;
224   }
225 
226   @Override
227   public void writePartition(DataOutput out,
228       int partitionId) throws IOException {
229     Int2ObjectOpenHashMap<DataInputOutput> partitionMap =
230         map.get(partitionId);
231     out.writeInt(partitionMap.size());
232     ObjectIterator<Int2ObjectMap.Entry<DataInputOutput>> iterator =
233         partitionMap.int2ObjectEntrySet().fastIterator();
234     while (iterator.hasNext()) {
235       Int2ObjectMap.Entry<DataInputOutput> entry = iterator.next();
236       out.writeInt(entry.getIntKey());
237       entry.getValue().write(out);
238     }
239   }
240 
241   @Override
242   public void readFieldsForPartition(DataInput in,
243       int partitionId) throws IOException {
244     int size = in.readInt();
245     Int2ObjectOpenHashMap<DataInputOutput> partitionMap =
246         new Int2ObjectOpenHashMap<DataInputOutput>(size);
247     while (size-- > 0) {
248       int vertexId = in.readInt();
249       DataInputOutput dataInputOutput = config.createMessagesInputOutput();
250       dataInputOutput.readFields(in);
251       partitionMap.put(vertexId, dataInputOutput);
252     }
253     synchronized (map) {
254       map.put(partitionId, partitionMap);
255     }
256   }
257 }