This project has retired. For details please refer to its Attic page.
AbstractListPerVertexStore xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.messages;
20  
21  import java.util.Collections;
22  import java.util.List;
23  import java.util.concurrent.ConcurrentMap;
24  
25  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
26  import org.apache.giraph.factories.MessageValueFactory;
27  import org.apache.giraph.utils.VertexIdIterator;
28  import org.apache.giraph.utils.WritableUtils;
29  import org.apache.hadoop.io.Writable;
30  import org.apache.hadoop.io.WritableComparable;
31  
32  /**
33   * Abstract Implementation of {@link SimpleMessageStore} where
34   * multiple messages are stored per vertex as a list
35   * Used when there is no combiner provided.
36   *
37   * @param <I> Vertex id
38   * @param <M> Message data
39   * @param <L> List type
40   */
41  public abstract class AbstractListPerVertexStore<I extends WritableComparable,
42    M extends Writable, L extends List> extends SimpleMessageStore<I, M, L> {
43  
44    /**
45     * Constructor
46     *
47     * @param messageValueFactory Message class held in the store
48     * @param partitionInfo Partition split info
49     * @param config Hadoop configuration
50     */
51    public AbstractListPerVertexStore(
52      MessageValueFactory<M> messageValueFactory,
53      PartitionSplitInfo<I> partitionInfo,
54      ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
55      super(messageValueFactory, partitionInfo, config);
56    }
57  
58    /**
59     * Create an instance of L
60     * @return instance of L
61     */
62    protected abstract L createList();
63  
64    /**
65     * Get the list of pointers for a vertex
66     * Each pointer has information of how to access an encoded message
67     * for this vertex
68     *
69     * @param iterator vertex id iterator
70     * @return pointer list
71     */
72    protected L getOrCreateList(VertexIdIterator<I> iterator) {
73      int partitionId = getPartitionId(iterator.getCurrentVertexId());
74      ConcurrentMap<I, L> partitionMap = getOrCreatePartitionMap(partitionId);
75      L list = partitionMap.get(iterator.getCurrentVertexId());
76      if (list == null) {
77        L newList = createList();
78        list = partitionMap.putIfAbsent(
79          iterator.releaseCurrentVertexId(), newList);
80        if (list == null) {
81          list = newList;
82        }
83      }
84      return list;
85    }
86  
87    /**
88     * Get the list of pointers for a vertex
89     * Each pointer has information of how to access an encoded message
90     * for this vertex
91     * This method will take ownership of the vertex id from the
92     * iterator if necessary (when used in the partition map entry)
93     *
94     * @param vertexId vertex id
95     * @return pointer list
96     */
97    protected L getOrCreateList(I vertexId) {
98      int partitionId = getPartitionId(vertexId);
99      ConcurrentMap<I, L> partitionMap = getOrCreatePartitionMap(partitionId);
100     L list = partitionMap.get(vertexId);
101     if (list == null) {
102       L newList = createList();
103       I copyId = WritableUtils.createCopy(vertexId);
104       list = partitionMap.putIfAbsent(copyId, newList);
105       if (list == null) {
106         list = newList;
107       }
108     }
109     return list;
110   }
111 
112   @Override
113   public Iterable<M> getVertexMessages(I vertexId) {
114     ConcurrentMap<I, L> partitionMap =
115         map.get(getPartitionId(vertexId));
116     if (partitionMap == null) {
117       return Collections.<M>emptyList();
118     }
119     L list = partitionMap.get(vertexId);
120     return list == null ? Collections.<M>emptyList() :
121         getMessagesAsIterable(list);
122   }
123 }
124