View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.messages;
20  
21  import java.io.DataInput;
22  import java.io.DataOutput;
23  import java.io.IOException;
24  import java.util.Collections;
25  import java.util.concurrent.ConcurrentMap;
26  
27  import org.apache.giraph.bsp.CentralizedServiceWorker;
28  import org.apache.giraph.combiner.MessageCombiner;
29  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
30  import org.apache.giraph.conf.MessageClasses;
31  import org.apache.giraph.factories.MessageValueFactory;
32  import org.apache.giraph.utils.VertexIdMessageIterator;
33  import org.apache.giraph.utils.VertexIdMessages;
34  import org.apache.hadoop.io.Writable;
35  import org.apache.hadoop.io.WritableComparable;
36  
37  /**
38   * Implementation of {@link SimpleMessageStore} where we have a single
39   * message per vertex.
40   * Used when {@link org.apache.giraph.combiner.MessageCombiner} is provided.
41   *
42   * @param <I> Vertex id
43   * @param <M> Message data
44   */
45  public class OneMessagePerVertexStore<I extends WritableComparable,
46      M extends Writable> extends SimpleMessageStore<I, M, M> {
47    /** MessageCombiner for messages */
48    private final MessageCombiner<? super I, M> messageCombiner;
49  
50    /**
51     * @param messageValueFactory Message class held in the store
52     * @param service Service worker
53     * @param messageCombiner MessageCombiner for messages
54     * @param config Hadoop configuration
55     */
56    public OneMessagePerVertexStore(
57        MessageValueFactory<M> messageValueFactory,
58        CentralizedServiceWorker<I, ?, ?> service,
59        MessageCombiner<? super I, M> messageCombiner,
60        ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
61      super(messageValueFactory, service, config);
62      this.messageCombiner =
63          messageCombiner;
64    }
65  
66    @Override
67    public boolean isPointerListEncoding() {
68      return false;
69    }
70  
71    @Override
72    public void addPartitionMessages(
73        int partitionId,
74        VertexIdMessages<I, M> messages) {
75      ConcurrentMap<I, M> partitionMap =
76          getOrCreatePartitionMap(partitionId);
77      VertexIdMessageIterator<I, M> vertexIdMessageIterator =
78        messages.getVertexIdMessageIterator();
79      // This loop is a little complicated as it is optimized to only create
80      // the minimal amount of vertex id and message objects as possible.
81      while (vertexIdMessageIterator.hasNext()) {
82        vertexIdMessageIterator.next();
83        I vertexId = vertexIdMessageIterator.getCurrentVertexId();
84        M currentMessage =
85            partitionMap.get(vertexIdMessageIterator.getCurrentVertexId());
86        if (currentMessage == null) {
87          M newMessage = messageCombiner.createInitialMessage();
88          currentMessage = partitionMap.putIfAbsent(
89              vertexIdMessageIterator.releaseCurrentVertexId(), newMessage);
90          if (currentMessage == null) {
91            currentMessage = newMessage;
92          }
93        }
94        synchronized (currentMessage) {
95          messageCombiner.combine(vertexId, currentMessage,
96              vertexIdMessageIterator.getCurrentMessage());
97        }
98      }
99    }
100 
101   @Override
102   protected Iterable<M> getMessagesAsIterable(M message) {
103     return Collections.singleton(message);
104   }
105 
106   @Override
107   protected int getNumberOfMessagesIn(ConcurrentMap<I, M> partitionMap) {
108     return partitionMap.size();
109   }
110 
111   @Override
112   protected void writeMessages(M messages, DataOutput out) throws IOException {
113     messages.write(out);
114   }
115 
116   @Override
117   protected M readFieldsForMessages(DataInput in) throws IOException {
118     M message = messageValueFactory.newInstance();
119     message.readFields(in);
120     return message;
121   }
122 
123 
124   /**
125    * Create new factory for this message store
126    *
127    * @param service Worker service
128    * @param config  Hadoop configuration
129    * @param <I>     Vertex id
130    * @param <M>     Message data
131    * @return Factory
132    */
133   public static <I extends WritableComparable, M extends Writable>
134   MessageStoreFactory<I, M, MessageStore<I, M>> newFactory(
135       CentralizedServiceWorker<I, ?, ?> service,
136       ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
137     return new Factory<I, M>(service, config);
138   }
139 
140   /**
141    * Factory for {@link OneMessagePerVertexStore}
142    *
143    * @param <I> Vertex id
144    * @param <M> Message data
145    */
146   private static class Factory<I extends WritableComparable,
147       M extends Writable>
148       implements MessageStoreFactory<I, M, MessageStore<I, M>> {
149     /** Service worker */
150     private CentralizedServiceWorker<I, ?, ?> service;
151     /** Hadoop configuration */
152     private ImmutableClassesGiraphConfiguration<I, ?, ?> config;
153 
154     /**
155      * @param service Worker service
156      * @param config  Hadoop configuration
157      */
158     public Factory(CentralizedServiceWorker<I, ?, ?> service,
159         ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
160       this.service = service;
161       this.config = config;
162     }
163 
164     @Override
165     public MessageStore<I, M> newStore(
166         MessageClasses<I, M> messageClasses) {
167       return new OneMessagePerVertexStore<I, M>(
168           messageClasses.createMessageValueFactory(config), service,
169           messageClasses.createMessageCombiner(config), config);
170     }
171 
172     @Override
173     public void initialize(CentralizedServiceWorker<I, ?, ?> service,
174         ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
175       this.service = service;
176       this.config = conf;
177     }
178   }
179 }