View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.messages.primitives.long_id;
20  
21  import com.google.common.collect.Lists;
22  import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
23  import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
24  import it.unimi.dsi.fastutil.longs.LongIterator;
25  import org.apache.giraph.bsp.CentralizedServiceWorker;
26  import org.apache.giraph.comm.messages.MessageStore;
27  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
28  import org.apache.giraph.factories.MessageValueFactory;
29  import org.apache.hadoop.io.LongWritable;
30  import org.apache.hadoop.io.Writable;
31  
32  import java.util.List;
33  
34  /**
35   * Special message store to be used when ids are LongWritable and no combiner
36   * is used.
37   * Uses fastutil primitive maps in order to decrease number of objects and
38   * get better performance.
39   *
40   * @param <M> message type
41   * @param <T> datastructure used to hold messages
42   */
43  public abstract class LongAbstractMessageStore<M extends Writable, T>
44    implements MessageStore<LongWritable, M> {
45    /** Message value factory */
46    protected final MessageValueFactory<M> messageValueFactory;
47    /** Map from partition id to map from vertex id to message */
48    protected final
49    Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<T>> map;
50    /** Service worker */
51    protected final CentralizedServiceWorker<LongWritable, ?, ?> service;
52    /** Giraph configuration */
53    protected final ImmutableClassesGiraphConfiguration<LongWritable, ?, ?>
54    config;
55  
56    /**
57     * Constructor
58     *
59     * @param messageValueFactory Factory for creating message values
60     * @param service      Service worker
61     * @param config       Hadoop configuration
62     */
63    public LongAbstractMessageStore(
64        MessageValueFactory<M> messageValueFactory,
65        CentralizedServiceWorker<LongWritable, Writable, Writable> service,
66        ImmutableClassesGiraphConfiguration<LongWritable, Writable, Writable>
67            config) {
68      this.messageValueFactory = messageValueFactory;
69      this.service = service;
70      this.config = config;
71  
72      map = new Int2ObjectOpenHashMap<>();
73      for (int partitionId : service.getPartitionStore().getPartitionIds()) {
74        Long2ObjectOpenHashMap<T> partitionMap = new Long2ObjectOpenHashMap<T>(
75            (int) service.getPartitionStore()
76                .getPartitionVertexCount(partitionId));
77        map.put(partitionId, partitionMap);
78      }
79    }
80  
81    /**
82     * Get map which holds messages for partition which vertex belongs to.
83     *
84     * @param vertexId Id of the vertex
85     * @return Map which holds messages for partition which vertex belongs to.
86     */
87    protected Long2ObjectOpenHashMap<T> getPartitionMap(
88        LongWritable vertexId) {
89      return map.get(service.getPartitionId(vertexId));
90    }
91  
92    @Override
93    public void clearPartition(int partitionId) {
94      map.get(partitionId).clear();
95    }
96  
97    @Override
98    public boolean hasMessagesForVertex(LongWritable vertexId) {
99      return getPartitionMap(vertexId).containsKey(vertexId.get());
100   }
101 
102   @Override
103   public boolean hasMessagesForPartition(int partitionId) {
104     Long2ObjectOpenHashMap<T> partitionMessages = map.get(partitionId);
105     return partitionMessages != null && !partitionMessages.isEmpty();
106   }
107 
108   @Override
109   public void clearVertexMessages(LongWritable vertexId) {
110     getPartitionMap(vertexId).remove(vertexId.get());
111   }
112 
113 
114   @Override
115   public void clearAll() {
116     map.clear();
117   }
118 
119   @Override
120   public Iterable<LongWritable> getPartitionDestinationVertices(
121       int partitionId) {
122     Long2ObjectOpenHashMap<T> partitionMap =
123         map.get(partitionId);
124     List<LongWritable> vertices =
125         Lists.newArrayListWithCapacity(partitionMap.size());
126     LongIterator iterator = partitionMap.keySet().iterator();
127     while (iterator.hasNext()) {
128       vertices.add(new LongWritable(iterator.nextLong()));
129     }
130     return vertices;
131   }
132 }