View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.messages;
20  
21  import java.io.DataInput;
22  import java.io.DataOutput;
23  import java.io.IOException;
24  import java.util.concurrent.ConcurrentMap;
25  
26  import org.apache.giraph.bsp.CentralizedServiceWorker;
27  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
28  import org.apache.giraph.conf.MessageClasses;
29  import org.apache.giraph.factories.MessageValueFactory;
30  import org.apache.giraph.utils.RepresentativeByteStructIterator;
31  import org.apache.giraph.utils.VerboseByteStructMessageWrite;
32  import org.apache.giraph.utils.VertexIdIterator;
33  import org.apache.giraph.utils.VertexIdMessageBytesIterator;
34  import org.apache.giraph.utils.VertexIdMessageIterator;
35  import org.apache.giraph.utils.VertexIdMessages;
36  import org.apache.giraph.utils.io.DataInputOutput;
37  import org.apache.hadoop.io.Writable;
38  import org.apache.hadoop.io.WritableComparable;
39  
40  import com.google.common.collect.Iterators;
41  
42  /**
43   * Implementation of {@link SimpleMessageStore} where multiple messages are
44   * stored per vertex as byte backed datastructures.
45   * Used when there is no combiner provided.
46   *
47   * @param <I> Vertex id
48   * @param <M> Message data
49   */
50  public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
51      M extends Writable> extends SimpleMessageStore<I, M, DataInputOutput> {
52    /**
53     * Constructor
54     *
55     * @param messageValueFactory Message class held in the store
56     * @param service Service worker
57     * @param config Hadoop configuration
58     */
59    public ByteArrayMessagesPerVertexStore(
60        MessageValueFactory<M> messageValueFactory,
61        CentralizedServiceWorker<I, ?, ?> service,
62        ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
63      super(messageValueFactory, service, config);
64    }
65  
66    @Override
67    public boolean isPointerListEncoding() {
68      return false;
69    }
70  
71    /**
72     * Get the extended data output for a vertex id from the iterator, creating
73     * if necessary.  This method will take ownership of the vertex id from the
74     * iterator if necessary (if used in the partition map entry).
75     *
76     * @param partitionMap Partition map to look in
77     * @param iterator Special iterator that can release ownerhips of vertex ids
78     * @return Extended data output for this vertex id (created if necessary)
79     */
80    private DataInputOutput getDataInputOutput(
81        ConcurrentMap<I, DataInputOutput> partitionMap,
82        VertexIdIterator<I> iterator) {
83      DataInputOutput dataInputOutput =
84          partitionMap.get(iterator.getCurrentVertexId());
85      if (dataInputOutput == null) {
86        DataInputOutput newDataOutput = config.createMessagesInputOutput();
87        dataInputOutput = partitionMap.putIfAbsent(
88            iterator.releaseCurrentVertexId(), newDataOutput);
89        if (dataInputOutput == null) {
90          dataInputOutput = newDataOutput;
91        }
92      }
93      return dataInputOutput;
94    }
95  
96    @Override
97    public void addPartitionMessages(
98      int partitionId, VertexIdMessages<I, M> messages) {
99      ConcurrentMap<I, DataInputOutput> partitionMap =
100         getOrCreatePartitionMap(partitionId);
101     VertexIdMessageBytesIterator<I, M> vertexIdMessageBytesIterator =
102         messages.getVertexIdMessageBytesIterator();
103     // Try to copy the message buffer over rather than
104     // doing a deserialization of a message just to know its size.  This
105     // should be more efficient for complex objects where serialization is
106     // expensive.  If this type of iterator is not available, fall back to
107     // deserializing/serializing the messages
108     if (vertexIdMessageBytesIterator != null) {
109       while (vertexIdMessageBytesIterator.hasNext()) {
110         vertexIdMessageBytesIterator.next();
111         DataInputOutput dataInputOutput =
112             getDataInputOutput(partitionMap, vertexIdMessageBytesIterator);
113 
114         synchronized (dataInputOutput) {
115           vertexIdMessageBytesIterator.writeCurrentMessageBytes(
116               dataInputOutput.getDataOutput());
117         }
118       }
119     } else {
120       try {
121         VertexIdMessageIterator<I, M> vertexIdMessageIterator =
122             messages.getVertexIdMessageIterator();
123         while (vertexIdMessageIterator.hasNext()) {
124           vertexIdMessageIterator.next();
125           DataInputOutput dataInputOutput =
126               getDataInputOutput(partitionMap, vertexIdMessageIterator);
127 
128           synchronized (dataInputOutput) {
129             VerboseByteStructMessageWrite.verboseWriteCurrentMessage(
130                 vertexIdMessageIterator, dataInputOutput.getDataOutput());
131           }
132         }
133       } catch (IOException e) {
134         throw new RuntimeException("addPartitionMessages: IOException while" +
135             " adding messages for a partition: " + e);
136       }
137     }
138   }
139 
140   @Override
141   protected Iterable<M> getMessagesAsIterable(
142       DataInputOutput dataInputOutput) {
143     return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
144   }
145 
146   @Override
147   protected int getNumberOfMessagesIn(
148       ConcurrentMap<I, DataInputOutput> partitionMap) {
149     int numberOfMessages = 0;
150     for (DataInputOutput dataInputOutput : partitionMap.values()) {
151       numberOfMessages += Iterators.size(
152           new RepresentativeByteStructIterator<M>(
153               dataInputOutput.createDataInput()) {
154             @Override
155             protected M createWritable() {
156               return messageValueFactory.newInstance();
157             }
158           });
159     }
160     return numberOfMessages;
161   }
162 
163   @Override
164   protected void writeMessages(DataInputOutput dataInputOutput,
165       DataOutput out) throws IOException {
166     dataInputOutput.write(out);
167   }
168 
169   @Override
170   protected DataInputOutput readFieldsForMessages(DataInput in) throws
171       IOException {
172     DataInputOutput dataInputOutput = config.createMessagesInputOutput();
173     dataInputOutput.readFields(in);
174     return dataInputOutput;
175   }
176 
177   /**
178    * Create new factory for this message store
179    *
180    * @param service Worker service
181    * @param config  Hadoop configuration
182    * @param <I>     Vertex id
183    * @param <M>     Message data
184    * @return Factory
185    */
186   public static <I extends WritableComparable, M extends Writable>
187   MessageStoreFactory<I, M, MessageStore<I, M>> newFactory(
188       CentralizedServiceWorker<I, ?, ?> service,
189       ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
190     return new Factory<I, M>(service, config);
191   }
192 
193   /**
194    * Factory for {@link ByteArrayMessagesPerVertexStore}
195    *
196    * @param <I> Vertex id
197    * @param <M> Message data
198    */
199   public static class Factory<I extends WritableComparable, M extends Writable>
200     implements MessageStoreFactory<I, M, MessageStore<I, M>> {
201     /** Service worker */
202     private CentralizedServiceWorker<I, ?, ?> service;
203     /** Hadoop configuration */
204     private ImmutableClassesGiraphConfiguration<I, ?, ?> config;
205 
206     /** Constructor for reflection */
207     public Factory() { }
208 
209     /**
210      * @param service Worker service
211      * @param config  Hadoop configuration
212      */
213     public Factory(CentralizedServiceWorker<I, ?, ?> service,
214         ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
215       this.service = service;
216       this.config = config;
217     }
218 
219     @Override
220     public MessageStore<I, M> newStore(
221         MessageClasses<I, M> messageClasses) {
222       return new ByteArrayMessagesPerVertexStore<I, M>(
223           messageClasses.createMessageValueFactory(config),
224           service, config);
225     }
226 
227     @Override
228     public void initialize(CentralizedServiceWorker<I, ?, ?> service,
229         ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
230       this.service = service;
231       this.config = conf;
232     }
233   }
234 }