This project has retired. For details please refer to its Attic page.
IdOneMessagePerVertexStore xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.comm.messages.primitives;
19  
20  import com.google.common.collect.Lists;
21  
22  import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
23  
24  import java.io.DataInput;
25  import java.io.DataOutput;
26  import java.io.IOException;
27  import java.util.Collections;
28  import java.util.Iterator;
29  import java.util.List;
30  
31  import org.apache.giraph.combiner.MessageCombiner;
32  import org.apache.giraph.comm.messages.MessageStore;
33  import org.apache.giraph.comm.messages.PartitionSplitInfo;
34  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
35  import org.apache.giraph.factories.MessageValueFactory;
36  import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
37  import org.apache.giraph.types.ops.TypeOpsUtils;
38  import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
39  import org.apache.giraph.types.ops.collections.WritableWriter;
40  import org.apache.giraph.utils.EmptyIterable;
41  import org.apache.giraph.utils.VertexIdMessageIterator;
42  import org.apache.giraph.utils.VertexIdMessages;
43  import org.apache.hadoop.io.Writable;
44  import org.apache.hadoop.io.WritableComparable;
45  
46  /**
47   * Special message store to be used when IDs are primitive and message doesn't
48   * need to be, and message combiner is used.
49   * Data is backed by primitive keyed maps in order to decrease number of
50   * objects and get better performance.
51   * (keys are using primitives, values are using objects, even if they
52   * are primitive)
53   *
54   * @param <I> Vertex id type
55   * @param <M> Message type
56   */
57  public class IdOneMessagePerVertexStore<I extends WritableComparable,
58      M extends Writable> implements MessageStore<I, M> {
59    /** Map from partition id to map from vertex id to message */
60    private final Int2ObjectOpenHashMap<Basic2ObjectMap<I, M>> map;
61    /** Message value factory */
62    private final MessageValueFactory<M> messageValueFactory;
63    /** Message messageCombiner */
64    private final MessageCombiner<? super I, M> messageCombiner;
65    /** Partition split info */
66    private final PartitionSplitInfo<I> partitionInfo;
67    /** Giraph configuration */
68    private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
69    /** Vertex id TypeOps */
70    private final PrimitiveIdTypeOps<I> idTypeOps;
71    /** WritableWriter for values in this message store */
72    private final WritableWriter<M> messageWriter = new WritableWriter<M>() {
73      @Override
74      public M readFields(DataInput in) throws IOException {
75        M message = messageValueFactory.newInstance();
76        message.readFields(in);
77        return message;
78      }
79  
80      @Override
81      public void write(DataOutput out, M value) throws IOException {
82        value.write(out);
83      }
84    };
85  
86    /**
87     * Constructor
88     *
89     * @param messageValueFactory Message value factory
90     * @param partitionInfo Partition split info
91     * @param messageCombiner Message messageCombiner
92     * @param config Config
93     */
94    public IdOneMessagePerVertexStore(
95        MessageValueFactory<M> messageValueFactory,
96        PartitionSplitInfo<I> partitionInfo,
97        MessageCombiner<? super I, M> messageCombiner,
98        ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
99      this.partitionInfo = partitionInfo;
100     this.config = config;
101     this.messageValueFactory = messageValueFactory;
102     this.messageCombiner = messageCombiner;
103 
104     idTypeOps = TypeOpsUtils.getPrimitiveIdTypeOps(config.getVertexIdClass());
105 
106     map = new Int2ObjectOpenHashMap<>();
107     for (int partitionId : partitionInfo.getPartitionIds()) {
108       Basic2ObjectMap<I, M> partitionMap = idTypeOps.create2ObjectOpenHashMap(
109         Math.max(10, (int) partitionInfo.getPartitionVertexCount(partitionId)),
110         messageWriter
111       );
112       map.put(partitionId, partitionMap);
113     }
114   }
115 
116   /**
117    * Get map which holds messages for partition which vertex belongs to.
118    *
119    * @param vertexId Id of the vertex
120    * @return Map which holds messages for partition which vertex belongs to.
121    */
122   private Basic2ObjectMap<I, M> getPartitionMap(I vertexId) {
123     return map.get(partitionInfo.getPartitionId(vertexId));
124   }
125 
126   @Override
127   public void addPartitionMessages(
128       int partitionId,
129       VertexIdMessages<I, M> messages) {
130     Basic2ObjectMap<I, M> partitionMap = map.get(partitionId);
131     synchronized (partitionMap) {
132       VertexIdMessageIterator<I, M>
133           iterator = messages.getVertexIdMessageIterator();
134       // This loop is a little complicated as it is optimized to only create
135       // the minimal amount of vertex id and message objects as possible.
136       while (iterator.hasNext()) {
137         iterator.next();
138         I vertexId = iterator.getCurrentVertexId();
139         M currentMessage =
140             partitionMap.get(iterator.getCurrentVertexId());
141         if (currentMessage == null) {
142           M newMessage = messageCombiner.createInitialMessage();
143           currentMessage = partitionMap.put(
144               iterator.getCurrentVertexId(), newMessage);
145           if (currentMessage == null) {
146             currentMessage = newMessage;
147           }
148         }
149         messageCombiner.combine(vertexId, currentMessage,
150           iterator.getCurrentMessage());
151       }
152     }
153   }
154 
155   /**
156    * Adds a message for a particular vertex
157    *
158    * @param vertexId Id of target vertex
159    * @param message  A message to send
160    * @throws IOException
161    */
162   @Override
163   public void addMessage(I vertexId, M message) throws IOException {
164     Basic2ObjectMap<I, M> partitionMap = getPartitionMap(vertexId);
165     synchronized (partitionMap) {
166       M currentMessage = partitionMap.get(vertexId);
167       if (currentMessage == null) {
168         M newMessage = messageCombiner.createInitialMessage();
169         currentMessage = partitionMap.put(vertexId, newMessage);
170         if (currentMessage == null) {
171           currentMessage = newMessage;
172         }
173       }
174       messageCombiner.combine(vertexId, currentMessage, message);
175     }
176   }
177 
178   @Override
179   public void clearPartition(int partitionId) {
180     map.get(partitionId).clear();
181   }
182 
183   @Override
184   public boolean hasMessagesForVertex(I vertexId) {
185     return getPartitionMap(vertexId).containsKey(vertexId);
186   }
187 
188   @Override
189   public boolean hasMessagesForPartition(int partitionId) {
190     Basic2ObjectMap<I, M> partitionMessages = map.get(partitionId);
191     return partitionMessages != null && partitionMessages.size() != 0;
192   }
193 
194   @Override
195   public Iterable<M> getVertexMessages(I vertexId) {
196     Basic2ObjectMap<I, M> partitionMap = getPartitionMap(vertexId);
197     if (!partitionMap.containsKey(vertexId)) {
198       return EmptyIterable.get();
199     } else {
200       return Collections.singleton(partitionMap.get(vertexId));
201     }
202   }
203 
204   @Override
205   public void clearVertexMessages(I vertexId) {
206     getPartitionMap(vertexId).remove(vertexId);
207   }
208 
209   @Override
210   public void clearAll() {
211     map.clear();
212   }
213 
214   @Override
215   public Iterable<I> getPartitionDestinationVertices(
216       int partitionId) {
217     Basic2ObjectMap<I, M> partitionMap = map.get(partitionId);
218     List<I> vertices =
219         Lists.newArrayListWithCapacity(partitionMap.size());
220     Iterator<I> iterator = partitionMap.fastKeyIterator();
221     while (iterator.hasNext()) {
222       vertices.add(idTypeOps.createCopy(iterator.next()));
223     }
224     return vertices;
225   }
226 
227   @Override
228   public void writePartition(DataOutput out,
229       int partitionId) throws IOException {
230     Basic2ObjectMap<I, M> partitionMap = map.get(partitionId);
231     partitionMap.write(out);
232   }
233 
234   @Override
235   public void readFieldsForPartition(DataInput in,
236       int partitionId) throws IOException {
237     Basic2ObjectMap<I, M> partitionMap = idTypeOps.create2ObjectOpenHashMap(
238         messageWriter);
239     partitionMap.readFields(in);
240     synchronized (map) {
241       map.put(partitionId, partitionMap);
242     }
243   }
244 
245   @Override
246   public void finalizeStore() {
247   }
248 
249   @Override
250   public boolean isPointerListEncoding() {
251     return false;
252   }
253 }