This project has retired. For details please refer to its Attic page.
ByteArrayMessagesPerVertexStore xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.messages;
20  
21  import com.google.common.collect.Iterators;
22  
23  import java.io.DataInput;
24  import java.io.DataOutput;
25  import java.io.IOException;
26  import java.util.concurrent.ConcurrentMap;
27  
28  import org.apache.giraph.bsp.CentralizedServiceWorker;
29  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
30  import org.apache.giraph.conf.MessageClasses;
31  import org.apache.giraph.factories.MessageValueFactory;
32  import org.apache.giraph.utils.RepresentativeByteStructIterator;
33  import org.apache.giraph.utils.VerboseByteStructMessageWrite;
34  import org.apache.giraph.utils.VertexIdIterator;
35  import org.apache.giraph.utils.VertexIdMessageBytesIterator;
36  import org.apache.giraph.utils.VertexIdMessageIterator;
37  import org.apache.giraph.utils.VertexIdMessages;
38  import org.apache.giraph.utils.WritableUtils;
39  import org.apache.giraph.utils.io.DataInputOutput;
40  import org.apache.hadoop.io.Writable;
41  import org.apache.hadoop.io.WritableComparable;
42  
43  /**
44   * Implementation of {@link SimpleMessageStore} where multiple messages are
45   * stored per vertex as byte backed datastructures.
46   * Used when there is no combiner provided.
47   *
48   * @param <I> Vertex id
49   * @param <M> Message data
50   */
51  public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
52      M extends Writable> extends SimpleMessageStore<I, M, DataInputOutput> {
53    /**
54     * Constructor
55     *
56     * @param messageValueFactory Message class held in the store
57     * @param partitionInfo Partition split info
58     * @param config Hadoop configuration
59     */
60    public ByteArrayMessagesPerVertexStore(
61        MessageValueFactory<M> messageValueFactory,
62        PartitionSplitInfo<I> partitionInfo,
63        ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
64      super(messageValueFactory, partitionInfo, config);
65    }
66  
67    @Override
68    public boolean isPointerListEncoding() {
69      return false;
70    }
71  
72    /**
73     * Get the extended data output for a vertex id from the iterator, creating
74     * if necessary.  This method will take ownership of the vertex id from the
75     * iterator if necessary (if used in the partition map entry).
76     *
77     * @param partitionMap Partition map to look in
78     * @param iterator Special iterator that can release ownerhips of vertex ids
79     * @return Extended data output for this vertex id (created if necessary)
80     */
81    private DataInputOutput getDataInputOutput(
82        ConcurrentMap<I, DataInputOutput> partitionMap,
83        VertexIdIterator<I> iterator) {
84      DataInputOutput dataInputOutput =
85          partitionMap.get(iterator.getCurrentVertexId());
86      if (dataInputOutput == null) {
87        DataInputOutput newDataOutput = config.createMessagesInputOutput();
88        dataInputOutput = partitionMap.putIfAbsent(
89            iterator.releaseCurrentVertexId(), newDataOutput);
90        if (dataInputOutput == null) {
91          dataInputOutput = newDataOutput;
92        }
93      }
94      return dataInputOutput;
95    }
96  
97    @Override
98    public void addPartitionMessages(
99      int partitionId, VertexIdMessages<I, M> messages) {
100     ConcurrentMap<I, DataInputOutput> partitionMap =
101         getOrCreatePartitionMap(partitionId);
102     VertexIdMessageBytesIterator<I, M> vertexIdMessageBytesIterator =
103         messages.getVertexIdMessageBytesIterator();
104     // Try to copy the message buffer over rather than
105     // doing a deserialization of a message just to know its size.  This
106     // should be more efficient for complex objects where serialization is
107     // expensive.  If this type of iterator is not available, fall back to
108     // deserializing/serializing the messages
109     if (vertexIdMessageBytesIterator != null) {
110       while (vertexIdMessageBytesIterator.hasNext()) {
111         vertexIdMessageBytesIterator.next();
112         DataInputOutput dataInputOutput =
113             getDataInputOutput(partitionMap, vertexIdMessageBytesIterator);
114 
115         synchronized (dataInputOutput) {
116           vertexIdMessageBytesIterator.writeCurrentMessageBytes(
117               dataInputOutput.getDataOutput());
118         }
119       }
120     } else {
121       try {
122         VertexIdMessageIterator<I, M> vertexIdMessageIterator =
123             messages.getVertexIdMessageIterator();
124         while (vertexIdMessageIterator.hasNext()) {
125           vertexIdMessageIterator.next();
126           DataInputOutput dataInputOutput =
127               getDataInputOutput(partitionMap, vertexIdMessageIterator);
128 
129           synchronized (dataInputOutput) {
130             VerboseByteStructMessageWrite.verboseWriteCurrentMessage(
131                 vertexIdMessageIterator, dataInputOutput.getDataOutput());
132           }
133         }
134       } catch (IOException e) {
135         throw new RuntimeException("addPartitionMessages: IOException while" +
136             " adding messages for a partition: " + e);
137       }
138     }
139   }
140 
141   @Override
142   public void addMessage(I vertexId, M message) throws IOException {
143     ConcurrentMap<I, DataInputOutput> partitionMap =
144       getOrCreatePartitionMap(getPartitionId(vertexId));
145     DataInputOutput dataInputOutput = partitionMap.get(vertexId);
146     if (dataInputOutput == null) {
147       DataInputOutput newDataOutput = config.createMessagesInputOutput();
148       I copyId = WritableUtils.createCopy(vertexId);
149       dataInputOutput = partitionMap.putIfAbsent(copyId, newDataOutput);
150       if (dataInputOutput == null) {
151         dataInputOutput = newDataOutput;
152       }
153     }
154 
155     synchronized (dataInputOutput) {
156       VerboseByteStructMessageWrite.verboseWriteCurrentMessage(
157         vertexId, message, dataInputOutput.getDataOutput());
158     }
159   }
160 
161   @Override
162   protected Iterable<M> getMessagesAsIterable(
163       DataInputOutput dataInputOutput) {
164     return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
165   }
166 
167   @Override
168   protected int getNumberOfMessagesIn(
169       ConcurrentMap<I, DataInputOutput> partitionMap) {
170     int numberOfMessages = 0;
171     for (DataInputOutput dataInputOutput : partitionMap.values()) {
172       numberOfMessages += Iterators.size(
173           new RepresentativeByteStructIterator<M>(
174               dataInputOutput.createDataInput()) {
175             @Override
176             protected M createWritable() {
177               return messageValueFactory.newInstance();
178             }
179           });
180     }
181     return numberOfMessages;
182   }
183 
184   @Override
185   protected void writeMessages(DataInputOutput dataInputOutput,
186       DataOutput out) throws IOException {
187     dataInputOutput.write(out);
188   }
189 
190   @Override
191   protected DataInputOutput readFieldsForMessages(DataInput in) throws
192       IOException {
193     DataInputOutput dataInputOutput = config.createMessagesInputOutput();
194     dataInputOutput.readFields(in);
195     return dataInputOutput;
196   }
197 
198   /**
199    * Create new factory for this message store
200    *
201    * @param service Worker service
202    * @param config  Hadoop configuration
203    * @param <I>     Vertex id
204    * @param <M>     Message data
205    * @return Factory
206    */
207   public static <I extends WritableComparable, M extends Writable>
208   MessageStoreFactory<I, M, MessageStore<I, M>> newFactory(
209       CentralizedServiceWorker<I, ?, ?> service,
210       ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
211     return new Factory<I, M>(service, config);
212   }
213 
214   /**
215    * Factory for {@link ByteArrayMessagesPerVertexStore}
216    *
217    * @param <I> Vertex id
218    * @param <M> Message data
219    */
220   public static class Factory<I extends WritableComparable, M extends Writable>
221     implements MessageStoreFactory<I, M, MessageStore<I, M>> {
222     /** Service worker */
223     private PartitionSplitInfo<I> partitionInfo;
224     /** Hadoop configuration */
225     private ImmutableClassesGiraphConfiguration<I, ?, ?> config;
226 
227     /** Constructor for reflection */
228     public Factory() { }
229 
230     /**
231      * @param partitionInfo Partition split info
232      * @param config  Hadoop configuration
233      */
234     public Factory(
235       PartitionSplitInfo<I> partitionInfo,
236       ImmutableClassesGiraphConfiguration<I, ?, ?> config
237     ) {
238       this.partitionInfo = partitionInfo;
239       this.config = config;
240     }
241 
242     @Override
243     public MessageStore<I, M> newStore(
244         MessageClasses<I, M> messageClasses) {
245       return new ByteArrayMessagesPerVertexStore<I, M>(
246           messageClasses.createMessageValueFactory(config),
247           partitionInfo, config);
248     }
249 
250     @Override
251     public void initialize(PartitionSplitInfo<I> partitionInfo,
252         ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
253       this.partitionInfo = partitionInfo;
254       this.config = conf;
255     }
256   }
257 
258 }