This project has retired. For details please refer to its Attic page.
SimpleMessageStore xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.messages;
20  
21  import com.google.common.collect.MapMaker;
22  import com.google.common.collect.Maps;
23  
24  import java.io.DataInput;
25  import java.io.DataOutput;
26  import java.io.IOException;
27  import java.util.Collections;
28  import java.util.Map;
29  import java.util.concurrent.ConcurrentMap;
30  
31  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
32  import org.apache.giraph.factories.MessageValueFactory;
33  import org.apache.hadoop.io.Writable;
34  import org.apache.hadoop.io.WritableComparable;
35  
36  /**
37   * Abstract class for {@link MessageStore} which allows any kind
38   * of object to hold messages for one vertex.
39   * Simple in memory message store implemented with a two level concurrent
40   * hash map.
41   *
42   * @param <I> Vertex id
43   * @param <M> Message data
44   * @param <T> Type of object which holds messages for one vertex
45   */
46  public abstract class SimpleMessageStore<I extends WritableComparable,
47      M extends Writable, T> implements MessageStore<I, M>  {
48    /** Message class */
49    protected final MessageValueFactory<M> messageValueFactory;
50    /** Partition split info */
51    protected final PartitionSplitInfo<I> partitionInfo;
52    /** Map from partition id to map from vertex id to messages for that vertex */
53    protected final ConcurrentMap<Integer, ConcurrentMap<I, T>> map;
54    /** Giraph configuration */
55    protected final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
56  
57    /**
58     * Constructor
59     *
60     * @param messageValueFactory Message class held in the store
61     * @param partitionInfo Partition split info
62     * @param config Giraph configuration
63     */
64    public SimpleMessageStore(
65        MessageValueFactory<M> messageValueFactory,
66        PartitionSplitInfo<I> partitionInfo,
67        ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
68      this.messageValueFactory = messageValueFactory;
69      this.partitionInfo = partitionInfo;
70      this.config = config;
71      map = new MapMaker().concurrencyLevel(
72          config.getNettyServerExecutionConcurrency()).makeMap();
73    }
74  
75    /**
76     * Get messages as an iterable from message storage
77     *
78     * @param messages Message storage
79     * @return Messages as an iterable
80     */
81    protected abstract Iterable<M> getMessagesAsIterable(T messages);
82  
83    /**
84     * Get number of messages in partition map
85     *
86     * @param partitionMap Partition map in which to count messages
87     * @return Number of messages in partition map
88     */
89    protected abstract int getNumberOfMessagesIn(
90        ConcurrentMap<I, T> partitionMap);
91  
92    /**
93     * Write message storage to {@link DataOutput}
94     *
95     * @param messages Message storage
96     * @param out Data output
97     * @throws IOException
98     */
99    protected abstract void writeMessages(T messages, DataOutput out) throws
100       IOException;
101 
102   /**
103    * Read message storage from {@link DataInput}
104    *
105    * @param in Data input
106    * @return Message storage
107    * @throws IOException
108    */
109   protected abstract T readFieldsForMessages(DataInput in) throws IOException;
110 
111   /**
112    * Get id of partition which holds vertex with selected id
113    *
114    * @param vertexId Id of vertex
115    * @return Id of partiton
116    */
117   protected int getPartitionId(I vertexId) {
118     return partitionInfo.getPartitionId(vertexId);
119   }
120 
121   /**
122    * If there is already a map of messages related to the partition id
123    * return that map, otherwise create a new one, put it in global map and
124    * return it.
125    *
126    * @param partitionId Id of partition
127    * @return Message map for this partition
128    */
129   protected ConcurrentMap<I, T> getOrCreatePartitionMap(int partitionId) {
130     ConcurrentMap<I, T> partitionMap = map.get(partitionId);
131     if (partitionMap == null) {
132       ConcurrentMap<I, T> tmpMap = new MapMaker().concurrencyLevel(
133           config.getNettyServerExecutionConcurrency()).makeMap();
134       partitionMap = map.putIfAbsent(partitionId, tmpMap);
135       if (partitionMap == null) {
136         partitionMap = tmpMap;
137       }
138     }
139     return partitionMap;
140   }
141 
142   @Override
143   public void finalizeStore() {
144   }
145 
146   @Override
147   public Iterable<I> getPartitionDestinationVertices(int partitionId) {
148     ConcurrentMap<I, ?> partitionMap = map.get(partitionId);
149     return (partitionMap == null) ? Collections.<I>emptyList() :
150         partitionMap.keySet();
151   }
152 
153   @Override
154   public boolean hasMessagesForVertex(I vertexId) {
155     ConcurrentMap<I, ?> partitionMap =
156         map.get(getPartitionId(vertexId));
157     return partitionMap != null && partitionMap.containsKey(vertexId);
158   }
159 
160   @Override
161   public Iterable<M> getVertexMessages(I vertexId) {
162     ConcurrentMap<I, T> partitionMap = map.get(getPartitionId(vertexId));
163     if (partitionMap == null) {
164       return Collections.<M>emptyList();
165     }
166     T messages = partitionMap.get(vertexId);
167     return (messages == null) ? Collections.<M>emptyList() :
168         getMessagesAsIterable(messages);
169   }
170 
171   @Override
172   public void writePartition(DataOutput out,
173       int partitionId) throws IOException {
174     ConcurrentMap<I, T> partitionMap = map.get(partitionId);
175     out.writeBoolean(partitionMap != null);
176     if (partitionMap != null) {
177       out.writeInt(partitionMap.size());
178       for (Map.Entry<I, T> entry : partitionMap.entrySet()) {
179         entry.getKey().write(out);
180         writeMessages(entry.getValue(), out);
181       }
182     }
183   }
184 
185   @Override
186   public void readFieldsForPartition(DataInput in,
187       int partitionId) throws IOException {
188     if (in.readBoolean()) {
189       ConcurrentMap<I, T> partitionMap = Maps.newConcurrentMap();
190       int numVertices = in.readInt();
191       for (int v = 0; v < numVertices; v++) {
192         I vertexId = config.createVertexId();
193         vertexId.readFields(in);
194         partitionMap.put(vertexId, readFieldsForMessages(in));
195       }
196       map.put(partitionId, partitionMap);
197     }
198   }
199 
200   @Override
201   public void clearVertexMessages(I vertexId) {
202     ConcurrentMap<I, ?> partitionMap =
203         map.get(getPartitionId(vertexId));
204     if (partitionMap != null) {
205       partitionMap.remove(vertexId);
206     }
207   }
208 
209   @Override
210   public void clearPartition(int partitionId) {
211     map.remove(partitionId);
212   }
213 
214   @Override
215   public boolean hasMessagesForPartition(int partitionId) {
216     ConcurrentMap<I, T> partitionMessages = map.get(partitionId);
217     return partitionMessages != null && !partitionMessages.isEmpty();
218   }
219 
220   @Override
221   public void clearAll() {
222     map.clear();
223   }
224 }