View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.messages;
20  
21  import org.apache.giraph.bsp.CentralizedServiceWorker;
22  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23  import org.apache.giraph.factories.MessageValueFactory;
24  import org.apache.giraph.partition.PartitionOwner;
25  import org.apache.giraph.utils.VertexIdIterator;
26  import org.apache.hadoop.io.Writable;
27  import org.apache.hadoop.io.WritableComparable;
28  
29  import java.util.Collections;
30  import java.util.List;
31  import java.util.concurrent.ConcurrentMap;
32  
33  /**
34   * Abstract Implementation of {@link SimpleMessageStore} where
35   * multiple messages are stored per vertex as a list
36   * Used when there is no combiner provided.
37   *
38   * @param <I> Vertex id
39   * @param <M> Message data
40   * @param <L> List type
41   */
42  public abstract class AbstractListPerVertexStore<I extends WritableComparable,
43    M extends Writable, L extends List> extends SimpleMessageStore<I, M, L> {
44  
45    /**
46     * Constructor
47     *
48     * @param messageValueFactory Message class held in the store
49     * @param service Service worker
50     * @param config Hadoop configuration
51     */
52    public AbstractListPerVertexStore(
53      MessageValueFactory<M> messageValueFactory,
54      CentralizedServiceWorker<I, ?, ?> service,
55      ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
56      super(messageValueFactory, service, config);
57    }
58  
59    /**
60     * Create an instance of L
61     * @return instance of L
62     */
63    protected abstract L createList();
64  
65    /**
66     * Get the list of pointers for a vertex
67     * Each pointer has information of how to access an encoded message
68     * for this vertex
69     *
70     * @param iterator vertex id iterator
71     * @return pointer list
72     */
73    protected L getOrCreateList(VertexIdIterator<I> iterator) {
74      PartitionOwner owner =
75          service.getVertexPartitionOwner(iterator.getCurrentVertexId());
76      int partitionId = owner.getPartitionId();
77      ConcurrentMap<I, L> partitionMap = getOrCreatePartitionMap(partitionId);
78      L list = partitionMap.get(iterator.getCurrentVertexId());
79      if (list == null) {
80        L newList = createList();
81        list = partitionMap.putIfAbsent(
82            iterator.releaseCurrentVertexId(), newList);
83        if (list == null) {
84          list = newList;
85        }
86      }
87      return list;
88    }
89  
90    @Override
91    public Iterable<M> getVertexMessages(I vertexId) {
92      ConcurrentMap<I, L> partitionMap =
93          map.get(getPartitionId(vertexId));
94      if (partitionMap == null) {
95        return Collections.<M>emptyList();
96      }
97      L list = partitionMap.get(vertexId);
98      return list == null ? Collections.<M>emptyList() :
99          getMessagesAsIterable(list);
100   }
101 }
102