View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.comm.messages.primitives;
19  
20  import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
21  
22  import java.io.DataInput;
23  import java.io.DataOutput;
24  import java.io.IOException;
25  import java.util.Collections;
26  import java.util.Iterator;
27  import java.util.List;
28  
29  import org.apache.giraph.bsp.CentralizedServiceWorker;
30  import org.apache.giraph.combiner.MessageCombiner;
31  import org.apache.giraph.comm.messages.MessageStore;
32  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
33  import org.apache.giraph.factories.MessageValueFactory;
34  import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
35  import org.apache.giraph.types.ops.TypeOpsUtils;
36  import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
37  import org.apache.giraph.types.ops.collections.WritableWriter;
38  import org.apache.giraph.utils.EmptyIterable;
39  import org.apache.giraph.utils.VertexIdMessageIterator;
40  import org.apache.giraph.utils.VertexIdMessages;
41  import org.apache.hadoop.io.Writable;
42  import org.apache.hadoop.io.WritableComparable;
43  
44  import com.google.common.collect.Lists;
45  
46  /**
47   * Special message store to be used when IDs are primitive and message doesn't
48   * need to be, and message combiner is used.
49   * Data is backed by primitive keyed maps in order to decrease number of
50   * objects and get better performance.
51   * (keys are using primitives, values are using objects, even if they
52   * are primitive)
53   *
54   * @param <I> Vertex id type
55   * @param <M> Message type
56   */
57  public class IdOneMessagePerVertexStore<I extends WritableComparable,
58      M extends Writable> implements MessageStore<I, M> {
59    /** Map from partition id to map from vertex id to message */
60    private final Int2ObjectOpenHashMap<Basic2ObjectMap<I, M>> map;
61    /** Message value factory */
62    private final MessageValueFactory<M> messageValueFactory;
63    /** Message messageCombiner */
64    private final MessageCombiner<? super I, M> messageCombiner;
65    /** Service worker */
66    private final CentralizedServiceWorker<I, ?, ?> service;
67    /** Giraph configuration */
68    private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
69    /** Vertex id TypeOps */
70    private final PrimitiveIdTypeOps<I> idTypeOps;
71    /** WritableWriter for values in this message store */
72    private final WritableWriter<M> messageWriter = new WritableWriter<M>() {
73      @Override
74      public M readFields(DataInput in) throws IOException {
75        M message = messageValueFactory.newInstance();
76        message.readFields(in);
77        return message;
78      }
79  
80      @Override
81      public void write(DataOutput out, M value) throws IOException {
82        value.write(out);
83      }
84    };
85  
86    /**
87     * Constructor
88     *
89     * @param messageValueFactory Message value factory
90     * @param service Service worker
91     * @param messageCombiner Message messageCombiner
92     * @param config Config
93     */
94    public IdOneMessagePerVertexStore(
95        MessageValueFactory<M> messageValueFactory,
96        CentralizedServiceWorker<I, ?, ?> service,
97        MessageCombiner<? super I, M> messageCombiner,
98        ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
99      this.service = service;
100     this.config = config;
101     this.messageValueFactory = messageValueFactory;
102     this.messageCombiner = messageCombiner;
103 
104     idTypeOps = TypeOpsUtils.getPrimitiveIdTypeOps(config.getVertexIdClass());
105 
106     map = new Int2ObjectOpenHashMap<>();
107     for (int partitionId : service.getPartitionStore().getPartitionIds()) {
108       Basic2ObjectMap<I, M> partitionMap = idTypeOps.create2ObjectOpenHashMap(
109           Math.max(10, (int) service.getPartitionStore()
110               .getPartitionVertexCount(partitionId)), messageWriter);
111       map.put(partitionId, partitionMap);
112     }
113   }
114 
115   /**
116    * Get map which holds messages for partition which vertex belongs to.
117    *
118    * @param vertexId Id of the vertex
119    * @return Map which holds messages for partition which vertex belongs to.
120    */
121   private Basic2ObjectMap<I, M> getPartitionMap(I vertexId) {
122     return map.get(service.getPartitionId(vertexId));
123   }
124 
125   @Override
126   public void addPartitionMessages(
127       int partitionId,
128       VertexIdMessages<I, M> messages) {
129     Basic2ObjectMap<I, M> partitionMap = map.get(partitionId);
130     synchronized (partitionMap) {
131       VertexIdMessageIterator<I, M>
132           iterator = messages.getVertexIdMessageIterator();
133       // This loop is a little complicated as it is optimized to only create
134       // the minimal amount of vertex id and message objects as possible.
135       while (iterator.hasNext()) {
136         iterator.next();
137         I vertexId = iterator.getCurrentVertexId();
138         M currentMessage =
139             partitionMap.get(iterator.getCurrentVertexId());
140         if (currentMessage == null) {
141           M newMessage = messageCombiner.createInitialMessage();
142           currentMessage = partitionMap.put(
143               iterator.getCurrentVertexId(), newMessage);
144           if (currentMessage == null) {
145             currentMessage = newMessage;
146           }
147         }
148         messageCombiner.combine(vertexId, currentMessage,
149           iterator.getCurrentMessage());
150       }
151     }
152   }
153 
154   @Override
155   public void clearPartition(int partitionId) {
156     map.get(partitionId).clear();
157   }
158 
159   @Override
160   public boolean hasMessagesForVertex(I vertexId) {
161     return getPartitionMap(vertexId).containsKey(vertexId);
162   }
163 
164   @Override
165   public boolean hasMessagesForPartition(int partitionId) {
166     Basic2ObjectMap<I, M> partitionMessages = map.get(partitionId);
167     return partitionMessages != null && partitionMessages.size() != 0;
168   }
169 
170   @Override
171   public Iterable<M> getVertexMessages(I vertexId) {
172     Basic2ObjectMap<I, M> partitionMap = getPartitionMap(vertexId);
173     if (!partitionMap.containsKey(vertexId)) {
174       return EmptyIterable.get();
175     } else {
176       return Collections.singleton(partitionMap.get(vertexId));
177     }
178   }
179 
180   @Override
181   public void clearVertexMessages(I vertexId) {
182     getPartitionMap(vertexId).remove(vertexId);
183   }
184 
185   @Override
186   public void clearAll() {
187     map.clear();
188   }
189 
190   @Override
191   public Iterable<I> getPartitionDestinationVertices(
192       int partitionId) {
193     Basic2ObjectMap<I, M> partitionMap = map.get(partitionId);
194     List<I> vertices =
195         Lists.newArrayListWithCapacity(partitionMap.size());
196     Iterator<I> iterator = partitionMap.fastKeyIterator();
197     while (iterator.hasNext()) {
198       vertices.add(idTypeOps.createCopy(iterator.next()));
199     }
200     return vertices;
201   }
202 
203   @Override
204   public void writePartition(DataOutput out,
205       int partitionId) throws IOException {
206     Basic2ObjectMap<I, M> partitionMap = map.get(partitionId);
207     partitionMap.write(out);
208   }
209 
210   @Override
211   public void readFieldsForPartition(DataInput in,
212       int partitionId) throws IOException {
213     Basic2ObjectMap<I, M> partitionMap = idTypeOps.create2ObjectOpenHashMap(
214         messageWriter);
215     partitionMap.readFields(in);
216     synchronized (map) {
217       map.put(partitionId, partitionMap);
218     }
219   }
220 
221   @Override
222   public void finalizeStore() {
223   }
224 
225   @Override
226   public boolean isPointerListEncoding() {
227     return false;
228   }
229 }