View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.comm.messages.primitives;
19  
20  import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
21  
22  import java.io.DataInput;
23  import java.io.DataOutput;
24  import java.io.IOException;
25  import java.util.Iterator;
26  import java.util.List;
27  
28  import org.apache.giraph.bsp.CentralizedServiceWorker;
29  import org.apache.giraph.comm.messages.MessageStore;
30  import org.apache.giraph.comm.messages.MessagesIterable;
31  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
32  import org.apache.giraph.factories.MessageValueFactory;
33  import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
34  import org.apache.giraph.types.ops.TypeOpsUtils;
35  import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
36  import org.apache.giraph.types.ops.collections.WritableWriter;
37  import org.apache.giraph.utils.EmptyIterable;
38  import org.apache.giraph.utils.VerboseByteStructMessageWrite;
39  import org.apache.giraph.utils.VertexIdMessageBytesIterator;
40  import org.apache.giraph.utils.VertexIdMessageIterator;
41  import org.apache.giraph.utils.VertexIdMessages;
42  import org.apache.giraph.utils.io.DataInputOutput;
43  import org.apache.hadoop.io.Writable;
44  import org.apache.hadoop.io.WritableComparable;
45  
46  import com.google.common.collect.Lists;
47  
48  /**
49   * Special message store to be used when IDs are primitive and no combiner is
50   * used.
51   * Data is backed by primitive maps in order to decrease number of objects and
52   * get better performance.
53   *
54   * @param <I> Vertex id type
55   * @param <M> Message type
56   */
57  public class IdByteArrayMessageStore<I extends WritableComparable,
58      M extends Writable> implements MessageStore<I, M> {
59    /** Message value factory */
60    protected final MessageValueFactory<M> messageValueFactory;
61    /** Map from partition id to map from vertex id to message */
62    private final Int2ObjectOpenHashMap<Basic2ObjectMap<I, DataInputOutput>> map;
63    /** Service worker */
64    private final CentralizedServiceWorker<I, ?, ?> service;
65    /** Giraph configuration */
66    private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
67    /** Vertex id TypeOps */
68    private final PrimitiveIdTypeOps<I> idTypeOps;
69    /** WritableWriter for values in this message store */
70    private final WritableWriter<DataInputOutput>
71    dataInputOutputWriter = new WritableWriter<DataInputOutput>() {
72      @Override
73      public DataInputOutput readFields(DataInput in) throws IOException {
74        DataInputOutput dataInputOutput = config.createMessagesInputOutput();
75        dataInputOutput.readFields(in);
76        return dataInputOutput;
77      }
78  
79      @Override
80      public void write(DataOutput out, DataInputOutput value)
81        throws IOException {
82        value.write(out);
83      }
84    };
85  
86    /**
87     * Constructor
88     *
89     * @param messageValueFactory Factory for creating message values
90     * @param service Service worker
91     * @param config Hadoop configuration
92     */
93    public IdByteArrayMessageStore(MessageValueFactory<M> messageValueFactory,
94        CentralizedServiceWorker<I, ?, ?> service,
95        ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
96      this.messageValueFactory = messageValueFactory;
97      this.service = service;
98      this.config = config;
99  
100     idTypeOps = TypeOpsUtils.getPrimitiveIdTypeOps(config.getVertexIdClass());
101 
102     map = new Int2ObjectOpenHashMap<Basic2ObjectMap<I, DataInputOutput>>();
103     for (int partitionId : service.getPartitionStore().getPartitionIds()) {
104       Basic2ObjectMap<I, DataInputOutput> partitionMap =
105           idTypeOps.create2ObjectOpenHashMap(
106               Math.max(10,
107                   (int) service.getPartitionStore()
108                       .getPartitionVertexCount(partitionId)),
109               dataInputOutputWriter);
110 
111       map.put(partitionId, partitionMap);
112     }
113   }
114 
115   /**
116    * Get map which holds messages for partition which vertex belongs to.
117    *
118    * @param vertexId Id of the vertex
119    * @return Map which holds messages for partition which vertex belongs to.
120    */
121   private Basic2ObjectMap<I, DataInputOutput> getPartitionMap(I vertexId) {
122     return map.get(service.getPartitionId(vertexId));
123   }
124 
125   /**
126    * Get the DataInputOutput for a vertex id, creating if necessary.
127    *
128    * @param partitionMap Partition map to look in
129    * @param vertexId Id of the vertex
130    * @return DataInputOutput for this vertex id (created if necessary)
131    */
132   private DataInputOutput getDataInputOutput(
133       Basic2ObjectMap<I, DataInputOutput> partitionMap,
134       I vertexId) {
135     DataInputOutput dataInputOutput = partitionMap.get(vertexId);
136     if (dataInputOutput == null) {
137       dataInputOutput = config.createMessagesInputOutput();
138       partitionMap.put(vertexId, dataInputOutput);
139     }
140     return dataInputOutput;
141   }
142 
143   @Override
144   public void addPartitionMessages(int partitionId,
145       VertexIdMessages<I, M> messages) {
146     Basic2ObjectMap<I, DataInputOutput> partitionMap = map.get(partitionId);
147     synchronized (partitionMap) {
148       VertexIdMessageBytesIterator<I, M> vertexIdMessageBytesIterator =
149           messages.getVertexIdMessageBytesIterator();
150       // Try to copy the message buffer over rather than
151       // doing a deserialization of a message just to know its size. This
152       // should be more efficient for complex objects where serialization is
153       // expensive. If this type of iterator is not available, fall back to
154       // deserializing/serializing the messages
155       if (vertexIdMessageBytesIterator != null) {
156         while (vertexIdMessageBytesIterator.hasNext()) {
157           vertexIdMessageBytesIterator.next();
158           DataInputOutput dataInputOutput = getDataInputOutput(
159               partitionMap, vertexIdMessageBytesIterator.getCurrentVertexId());
160           vertexIdMessageBytesIterator.writeCurrentMessageBytes(
161               dataInputOutput.getDataOutput());
162         }
163       } else {
164         try {
165           VertexIdMessageIterator<I, M> iterator =
166               messages.getVertexIdMessageIterator();
167           while (iterator.hasNext()) {
168             iterator.next();
169             DataInputOutput dataInputOutput =
170                 getDataInputOutput(partitionMap, iterator.getCurrentVertexId());
171 
172             VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
173                 dataInputOutput.getDataOutput());
174           }
175         } catch (IOException e) {
176           throw new RuntimeException("addPartitionMessages: IOException while" +
177               " adding message for a partition: " + e);
178         }
179       }
180     }
181   }
182 
183   @Override
184   public void clearPartition(int partitionId) {
185     map.get(partitionId).clear();
186   }
187 
188   @Override
189   public boolean hasMessagesForVertex(I vertexId) {
190     return getPartitionMap(vertexId).containsKey(vertexId);
191   }
192 
193   @Override
194   public boolean hasMessagesForPartition(int partitionId) {
195     Basic2ObjectMap<I, DataInputOutput> partitionMessages =
196         map.get(partitionId);
197     return partitionMessages != null && partitionMessages.size() != 0;
198   }
199 
200   @Override
201   public Iterable<M> getVertexMessages(I vertexId) {
202     DataInputOutput dataInputOutput = getPartitionMap(vertexId).get(vertexId);
203     if (dataInputOutput == null) {
204       return EmptyIterable.get();
205     } else {
206       return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
207     }
208   }
209 
210   @Override
211   public void clearVertexMessages(I vertexId) {
212     getPartitionMap(vertexId).remove(vertexId);
213   }
214 
215   @Override
216   public void clearAll() {
217     map.clear();
218   }
219 
220   @Override
221   public Iterable<I> getPartitionDestinationVertices(int partitionId) {
222     Basic2ObjectMap<I, DataInputOutput> partitionMap = map.get(partitionId);
223     List<I> vertices = Lists.newArrayListWithCapacity(partitionMap.size());
224     Iterator<I> iterator = partitionMap.fastKeyIterator();
225     while (iterator.hasNext()) {
226       vertices.add(idTypeOps.createCopy(iterator.next()));
227     }
228     return vertices;
229   }
230 
231   @Override
232   public void writePartition(DataOutput out, int partitionId)
233     throws IOException {
234     Basic2ObjectMap<I, DataInputOutput> partitionMap = map.get(partitionId);
235     partitionMap.write(out);
236   }
237 
238   @Override
239   public void readFieldsForPartition(DataInput in, int partitionId)
240     throws IOException {
241     Basic2ObjectMap<I, DataInputOutput> partitionMap =
242         idTypeOps.create2ObjectOpenHashMap(dataInputOutputWriter);
243     partitionMap.readFields(in);
244     synchronized (map) {
245       map.put(partitionId, partitionMap);
246     }
247   }
248 
249   @Override
250   public void finalizeStore() {
251   }
252 
253   @Override
254   public boolean isPointerListEncoding() {
255     return false;
256   }
257 }