This project has retired. For details please refer to its Attic page.
OneMessagePerVertexStore xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.messages;
20  
21  import java.io.DataInput;
22  import java.io.DataOutput;
23  import java.io.IOException;
24  import java.util.Collections;
25  import java.util.concurrent.ConcurrentMap;
26  
27  import org.apache.giraph.bsp.CentralizedServiceWorker;
28  import org.apache.giraph.combiner.MessageCombiner;
29  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
30  import org.apache.giraph.conf.MessageClasses;
31  import org.apache.giraph.factories.MessageValueFactory;
32  import org.apache.giraph.utils.VertexIdMessageIterator;
33  import org.apache.giraph.utils.VertexIdMessages;
34  import org.apache.giraph.utils.WritableUtils;
35  import org.apache.hadoop.io.Writable;
36  import org.apache.hadoop.io.WritableComparable;
37  
38  /**
39   * Implementation of {@link SimpleMessageStore} where we have a single
40   * message per vertex.
41   * Used when {@link org.apache.giraph.combiner.MessageCombiner} is provided.
42   *
43   * @param <I> Vertex id
44   * @param <M> Message data
45   */
46  public class OneMessagePerVertexStore<I extends WritableComparable,
47      M extends Writable> extends SimpleMessageStore<I, M, M> {
48    /** MessageCombiner for messages */
49    private final MessageCombiner<? super I, M> messageCombiner;
50  
51    /**
52     * @param messageValueFactory Message class held in the store
53     * @param partitionInfo Partition split info
54     * @param messageCombiner MessageCombiner for messages
55     * @param config Hadoop configuration
56     */
57    public OneMessagePerVertexStore(
58      MessageValueFactory<M> messageValueFactory,
59      PartitionSplitInfo<I> partitionInfo,
60      MessageCombiner<? super I, M> messageCombiner,
61      ImmutableClassesGiraphConfiguration<I, ?, ?> config
62    ) {
63      super(messageValueFactory, partitionInfo, config);
64      this.messageCombiner =
65          messageCombiner;
66    }
67  
68    @Override
69    public boolean isPointerListEncoding() {
70      return false;
71    }
72  
73    @Override
74    public void addPartitionMessages(
75        int partitionId,
76        VertexIdMessages<I, M> messages) {
77      ConcurrentMap<I, M> partitionMap =
78          getOrCreatePartitionMap(partitionId);
79      VertexIdMessageIterator<I, M> vertexIdMessageIterator =
80        messages.getVertexIdMessageIterator();
81      // This loop is a little complicated as it is optimized to only create
82      // the minimal amount of vertex id and message objects as possible.
83      while (vertexIdMessageIterator.hasNext()) {
84        vertexIdMessageIterator.next();
85        I vertexId = vertexIdMessageIterator.getCurrentVertexId();
86        M currentMessage =
87            partitionMap.get(vertexIdMessageIterator.getCurrentVertexId());
88        if (currentMessage == null) {
89          M newMessage = messageCombiner.createInitialMessage();
90          currentMessage = partitionMap.putIfAbsent(
91              vertexIdMessageIterator.releaseCurrentVertexId(), newMessage);
92          if (currentMessage == null) {
93            currentMessage = newMessage;
94          }
95        }
96        synchronized (currentMessage) {
97          messageCombiner.combine(vertexId, currentMessage,
98              vertexIdMessageIterator.getCurrentMessage());
99        }
100     }
101   }
102 
103   @Override
104   public void addMessage(I vertexId, M message) throws IOException {
105     ConcurrentMap<I, M> partitionMap =
106       getOrCreatePartitionMap(getPartitionId(vertexId));
107     M currentMessage = partitionMap.get(vertexId);
108     if (currentMessage == null) {
109       M newMessage = messageCombiner.createInitialMessage();
110       // need a copy as vertexId might be reusable
111       I copyId = WritableUtils.createCopy(vertexId);
112       currentMessage = partitionMap.putIfAbsent(copyId, newMessage);
113       if (currentMessage == null) {
114         currentMessage = newMessage;
115       }
116     }
117     synchronized (currentMessage) {
118       messageCombiner.combine(vertexId, currentMessage, message);
119     }
120   }
121 
122   @Override
123   protected Iterable<M> getMessagesAsIterable(M message) {
124     return Collections.singleton(message);
125   }
126 
127   @Override
128   protected int getNumberOfMessagesIn(ConcurrentMap<I, M> partitionMap) {
129     return partitionMap.size();
130   }
131 
132   @Override
133   protected void writeMessages(M messages, DataOutput out) throws IOException {
134     messages.write(out);
135   }
136 
137   @Override
138   protected M readFieldsForMessages(DataInput in) throws IOException {
139     M message = messageValueFactory.newInstance();
140     message.readFields(in);
141     return message;
142   }
143 
144 
145   /**
146    * Create new factory for this message store
147    *
148    * @param service Worker service
149    * @param config  Hadoop configuration
150    * @param <I>     Vertex id
151    * @param <M>     Message data
152    * @return Factory
153    */
154   public static <I extends WritableComparable, M extends Writable>
155   MessageStoreFactory<I, M, MessageStore<I, M>> newFactory(
156       CentralizedServiceWorker<I, ?, ?> service,
157       ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
158     return new Factory<I, M>(service, config);
159   }
160 
161   /**
162    * Factory for {@link OneMessagePerVertexStore}
163    *
164    * @param <I> Vertex id
165    * @param <M> Message data
166    */
167   private static class Factory<I extends WritableComparable,
168       M extends Writable>
169       implements MessageStoreFactory<I, M, MessageStore<I, M>> {
170     /** Service worker */
171     private PartitionSplitInfo<I> partitionInfo;
172     /** Hadoop configuration */
173     private ImmutableClassesGiraphConfiguration<I, ?, ?> config;
174 
175     /**
176      * @param partitionInfo Partition split info
177      * @param config  Hadoop configuration
178      */
179     public Factory(
180       PartitionSplitInfo<I> partitionInfo,
181       ImmutableClassesGiraphConfiguration<I, ?, ?> config
182     ) {
183       this.partitionInfo = partitionInfo;
184       this.config = config;
185     }
186 
187     @Override
188     public MessageStore<I, M> newStore(
189         MessageClasses<I, M> messageClasses) {
190       return new OneMessagePerVertexStore<I, M>(
191           messageClasses.createMessageValueFactory(config), partitionInfo,
192           messageClasses.createMessageCombiner(config), config);
193     }
194 
195     @Override
196     public void initialize(PartitionSplitInfo<I> partitionInfo,
197         ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
198       this.partitionInfo = partitionInfo;
199       this.config = conf;
200     }
201   }
202 }