1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm.messages;
2021import java.util.Collections;
22import java.util.List;
23import java.util.concurrent.ConcurrentMap;
2425import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
26import org.apache.giraph.factories.MessageValueFactory;
27import org.apache.giraph.utils.VertexIdIterator;
28import org.apache.giraph.utils.WritableUtils;
29import org.apache.hadoop.io.Writable;
30import org.apache.hadoop.io.WritableComparable;
3132/**33 * Abstract Implementation of {@link SimpleMessageStore} where34 * multiple messages are stored per vertex as a list35 * Used when there is no combiner provided.36 *37 * @param <I> Vertex id38 * @param <M> Message data39 * @param <L> List type40 */41publicabstractclass AbstractListPerVertexStore<I extends WritableComparable,
42 M extends Writable, L extends List> extends SimpleMessageStore<I, M, L> {
4344/**45 * Constructor46 *47 * @param messageValueFactory Message class held in the store48 * @param partitionInfo Partition split info49 * @param config Hadoop configuration50 */51publicAbstractListPerVertexStore(
52 MessageValueFactory<M> messageValueFactory,
53 PartitionSplitInfo<I> partitionInfo,
54 ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
55super(messageValueFactory, partitionInfo, config);
56 }
5758/**59 * Create an instance of L60 * @return instance of L61 */62protectedabstract L createList();
6364/**65 * Get the list of pointers for a vertex66 * Each pointer has information of how to access an encoded message67 * for this vertex68 *69 * @param iterator vertex id iterator70 * @return pointer list71 */72protected L getOrCreateList(VertexIdIterator<I> iterator) {
73int partitionId = getPartitionId(iterator.getCurrentVertexId());
74 ConcurrentMap<I, L> partitionMap = getOrCreatePartitionMap(partitionId);
75 L list = partitionMap.get(iterator.getCurrentVertexId());
76if (list == null) {
77 L newList = createList();
78 list = partitionMap.putIfAbsent(
79 iterator.releaseCurrentVertexId(), newList);
80if (list == null) {
81 list = newList;
82 }
83 }
84return list;
85 }
8687/**88 * Get the list of pointers for a vertex89 * Each pointer has information of how to access an encoded message90 * for this vertex91 * This method will take ownership of the vertex id from the92 * iterator if necessary (when used in the partition map entry)93 *94 * @param vertexId vertex id95 * @return pointer list96 */97protected L getOrCreateList(I vertexId) {
98int partitionId = getPartitionId(vertexId);
99 ConcurrentMap<I, L> partitionMap = getOrCreatePartitionMap(partitionId);
100 L list = partitionMap.get(vertexId);
101if (list == null) {
102 L newList = createList();
103 I copyId = WritableUtils.createCopy(vertexId);
104 list = partitionMap.putIfAbsent(copyId, newList);
105if (list == null) {
106 list = newList;
107 }
108 }
109return list;
110 }
111112 @Override
113public Iterable<M> getVertexMessages(I vertexId) {
114 ConcurrentMap<I, L> partitionMap =
115 map.get(getPartitionId(vertexId));
116if (partitionMap == null) {
117return Collections.<M>emptyList();
118 }
119 L list = partitionMap.get(vertexId);
120return list == null ? Collections.<M>emptyList() :
121 getMessagesAsIterable(list);
122 }
123 }
124