View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.messages;
20  
21  import com.google.common.collect.MapMaker;
22  import com.google.common.collect.Maps;
23  import java.io.DataInput;
24  import java.io.DataOutput;
25  import java.io.IOException;
26  import java.util.Collections;
27  import java.util.Map;
28  import java.util.concurrent.ConcurrentMap;
29  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
30  import org.apache.giraph.bsp.CentralizedServiceWorker;
31  import org.apache.giraph.factories.MessageValueFactory;
32  import org.apache.hadoop.io.Writable;
33  import org.apache.hadoop.io.WritableComparable;
34  
35  /**
36   * Abstract class for {@link MessageStore} which allows any kind
37   * of object to hold messages for one vertex.
38   * Simple in memory message store implemented with a two level concurrent
39   * hash map.
40   *
41   * @param <I> Vertex id
42   * @param <M> Message data
43   * @param <T> Type of object which holds messages for one vertex
44   */
45  public abstract class SimpleMessageStore<I extends WritableComparable,
46      M extends Writable, T> implements MessageStore<I, M>  {
47    /** Message class */
48    protected final MessageValueFactory<M> messageValueFactory;
49    /** Service worker */
50    protected final CentralizedServiceWorker<I, ?, ?> service;
51    /** Map from partition id to map from vertex id to messages for that vertex */
52    protected final ConcurrentMap<Integer, ConcurrentMap<I, T>> map;
53    /** Giraph configuration */
54    protected final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
55  
56    /**
57     * Constructor
58     *
59     * @param messageValueFactory Message class held in the store
60     * @param service Service worker
61     * @param config Giraph configuration
62     */
63    public SimpleMessageStore(
64        MessageValueFactory<M> messageValueFactory,
65        CentralizedServiceWorker<I, ?, ?> service,
66        ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
67      this.messageValueFactory = messageValueFactory;
68      this.service = service;
69      this.config = config;
70      map = new MapMaker().concurrencyLevel(
71          config.getNettyServerExecutionConcurrency()).makeMap();
72    }
73  
74    /**
75     * Get messages as an iterable from message storage
76     *
77     * @param messages Message storage
78     * @return Messages as an iterable
79     */
80    protected abstract Iterable<M> getMessagesAsIterable(T messages);
81  
82    /**
83     * Get number of messages in partition map
84     *
85     * @param partitionMap Partition map in which to count messages
86     * @return Number of messages in partition map
87     */
88    protected abstract int getNumberOfMessagesIn(
89        ConcurrentMap<I, T> partitionMap);
90  
91    /**
92     * Write message storage to {@link DataOutput}
93     *
94     * @param messages Message storage
95     * @param out Data output
96     * @throws IOException
97     */
98    protected abstract void writeMessages(T messages, DataOutput out) throws
99        IOException;
100 
101   /**
102    * Read message storage from {@link DataInput}
103    *
104    * @param in Data input
105    * @return Message storage
106    * @throws IOException
107    */
108   protected abstract T readFieldsForMessages(DataInput in) throws IOException;
109 
110   /**
111    * Get id of partition which holds vertex with selected id
112    *
113    * @param vertexId Id of vertex
114    * @return Id of partiton
115    */
116   protected int getPartitionId(I vertexId) {
117     return service.getVertexPartitionOwner(vertexId).getPartitionId();
118   }
119 
120   /**
121    * If there is already a map of messages related to the partition id
122    * return that map, otherwise create a new one, put it in global map and
123    * return it.
124    *
125    * @param partitionId Id of partition
126    * @return Message map for this partition
127    */
128   protected ConcurrentMap<I, T> getOrCreatePartitionMap(int partitionId) {
129     ConcurrentMap<I, T> partitionMap = map.get(partitionId);
130     if (partitionMap == null) {
131       ConcurrentMap<I, T> tmpMap = new MapMaker().concurrencyLevel(
132           config.getNettyServerExecutionConcurrency()).makeMap();
133       partitionMap = map.putIfAbsent(partitionId, tmpMap);
134       if (partitionMap == null) {
135         partitionMap = tmpMap;
136       }
137     }
138     return partitionMap;
139   }
140 
141   @Override
142   public void finalizeStore() {
143   }
144 
145   @Override
146   public Iterable<I> getPartitionDestinationVertices(int partitionId) {
147     ConcurrentMap<I, ?> partitionMap = map.get(partitionId);
148     return (partitionMap == null) ? Collections.<I>emptyList() :
149         partitionMap.keySet();
150   }
151 
152   @Override
153   public boolean hasMessagesForVertex(I vertexId) {
154     ConcurrentMap<I, ?> partitionMap =
155         map.get(getPartitionId(vertexId));
156     return partitionMap != null && partitionMap.containsKey(vertexId);
157   }
158 
159   @Override
160   public Iterable<M> getVertexMessages(I vertexId) {
161     ConcurrentMap<I, T> partitionMap = map.get(getPartitionId(vertexId));
162     if (partitionMap == null) {
163       return Collections.<M>emptyList();
164     }
165     T messages = partitionMap.get(vertexId);
166     return (messages == null) ? Collections.<M>emptyList() :
167         getMessagesAsIterable(messages);
168   }
169 
170   @Override
171   public void writePartition(DataOutput out,
172       int partitionId) throws IOException {
173     ConcurrentMap<I, T> partitionMap = map.get(partitionId);
174     out.writeBoolean(partitionMap != null);
175     if (partitionMap != null) {
176       out.writeInt(partitionMap.size());
177       for (Map.Entry<I, T> entry : partitionMap.entrySet()) {
178         entry.getKey().write(out);
179         writeMessages(entry.getValue(), out);
180       }
181     }
182   }
183 
184   @Override
185   public void readFieldsForPartition(DataInput in,
186       int partitionId) throws IOException {
187     if (in.readBoolean()) {
188       ConcurrentMap<I, T> partitionMap = Maps.newConcurrentMap();
189       int numVertices = in.readInt();
190       for (int v = 0; v < numVertices; v++) {
191         I vertexId = config.createVertexId();
192         vertexId.readFields(in);
193         partitionMap.put(vertexId, readFieldsForMessages(in));
194       }
195       map.put(partitionId, partitionMap);
196     }
197   }
198 
199   @Override
200   public void clearVertexMessages(I vertexId) {
201     ConcurrentMap<I, ?> partitionMap =
202         map.get(getPartitionId(vertexId));
203     if (partitionMap != null) {
204       partitionMap.remove(vertexId);
205     }
206   }
207 
208   @Override
209   public void clearPartition(int partitionId) {
210     map.remove(partitionId);
211   }
212 
213   @Override
214   public boolean hasMessagesForPartition(int partitionId) {
215     ConcurrentMap<I, T> partitionMessages = map.get(partitionId);
216     return partitionMessages != null && !partitionMessages.isEmpty();
217   }
218 
219   @Override
220   public void clearAll() {
221     map.clear();
222   }
223 }