This project has retired. For details please refer to its Attic page.
LongAbstractStore xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.messages.primitives.long_id;
20  
21  import com.google.common.collect.Lists;
22  
23  import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
24  import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
25  import it.unimi.dsi.fastutil.longs.LongIterator;
26  
27  import java.util.List;
28  
29  import org.apache.giraph.comm.messages.MessageStore;
30  import org.apache.giraph.comm.messages.PartitionSplitInfo;
31  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
32  import org.apache.giraph.factories.MessageValueFactory;
33  import org.apache.hadoop.io.LongWritable;
34  import org.apache.hadoop.io.Writable;
35  
36  /**
37   * Special message store to be used when ids are LongWritable and no combiner
38   * is used.
39   * Uses fastutil primitive maps in order to decrease number of objects and
40   * get better performance.
41   *
42   * @param <M> message type
43   * @param <T> datastructure used to hold messages
44   */
45  public abstract class LongAbstractStore<M extends Writable, T>
46    implements MessageStore<LongWritable, M> {
47    /** Message value factory */
48    protected final MessageValueFactory<M> messageValueFactory;
49    /** Map from partition id to map from vertex id to message */
50    protected final
51    Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<T>> map;
52    /** Service worker */
53    protected final PartitionSplitInfo<LongWritable> partitionInfo;
54    /** Giraph configuration */
55    protected final ImmutableClassesGiraphConfiguration<LongWritable, ?, ?>
56    config;
57  
58    /**
59     * Constructor
60     *
61     * @param messageValueFactory Factory for creating message values
62     * @param partitionInfo       Partition split info
63     * @param config              Hadoop configuration
64     */
65    public LongAbstractStore(
66        MessageValueFactory<M> messageValueFactory,
67        PartitionSplitInfo<LongWritable> partitionInfo,
68        ImmutableClassesGiraphConfiguration<LongWritable, Writable, Writable>
69            config) {
70      this.messageValueFactory = messageValueFactory;
71      this.partitionInfo = partitionInfo;
72      this.config = config;
73  
74      map = new Int2ObjectOpenHashMap<>();
75      for (int partitionId : partitionInfo.getPartitionIds()) {
76        Long2ObjectOpenHashMap<T> partitionMap = new Long2ObjectOpenHashMap<T>(
77            (int) partitionInfo.getPartitionVertexCount(partitionId));
78        map.put(partitionId, partitionMap);
79      }
80    }
81  
82    /**
83     * Get map which holds messages for partition which vertex belongs to.
84     *
85     * @param vertexId Id of the vertex
86     * @return Map which holds messages for partition which vertex belongs to.
87     */
88    protected Long2ObjectOpenHashMap<T> getPartitionMap(
89        LongWritable vertexId) {
90      return map.get(partitionInfo.getPartitionId(vertexId));
91    }
92  
93    @Override
94    public void clearPartition(int partitionId) {
95      map.get(partitionId).clear();
96    }
97  
98    @Override
99    public boolean hasMessagesForVertex(LongWritable vertexId) {
100     return getPartitionMap(vertexId).containsKey(vertexId.get());
101   }
102 
103   @Override
104   public boolean hasMessagesForPartition(int partitionId) {
105     Long2ObjectOpenHashMap<T> partitionMessages = map.get(partitionId);
106     return partitionMessages != null && !partitionMessages.isEmpty();
107   }
108 
109   @Override
110   public void clearVertexMessages(LongWritable vertexId) {
111     getPartitionMap(vertexId).remove(vertexId.get());
112   }
113 
114 
115   @Override
116   public void clearAll() {
117     map.clear();
118   }
119 
120   @Override
121   public Iterable<LongWritable> getPartitionDestinationVertices(
122       int partitionId) {
123     Long2ObjectOpenHashMap<T> partitionMap =
124         map.get(partitionId);
125     List<LongWritable> vertices =
126         Lists.newArrayListWithCapacity(partitionMap.size());
127     LongIterator iterator = partitionMap.keySet().iterator();
128     while (iterator.hasNext()) {
129       vertices.add(new LongWritable(iterator.nextLong()));
130     }
131     return vertices;
132   }
133 }