1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm.messages.primitives.long_id;
2021import com.google.common.collect.Lists;
2223import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
24import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
25import it.unimi.dsi.fastutil.longs.LongIterator;
2627import java.util.List;
2829import org.apache.giraph.comm.messages.MessageStore;
30import org.apache.giraph.comm.messages.PartitionSplitInfo;
31import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
32import org.apache.giraph.factories.MessageValueFactory;
33import org.apache.hadoop.io.LongWritable;
34import org.apache.hadoop.io.Writable;
3536/**37 * Special message store to be used when ids are LongWritable and no combiner38 * is used.39 * Uses fastutil primitive maps in order to decrease number of objects and40 * get better performance.41 *42 * @param <M> message type43 * @param <T> datastructure used to hold messages44 */45publicabstractclass LongAbstractStore<M extends Writable, T>
46implements MessageStore<LongWritable, M> {
47/** Message value factory */48protectedfinal MessageValueFactory<M> messageValueFactory;
49/** Map from partition id to map from vertex id to message */50protectedfinal51 Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<T>> map;
52/** Service worker */53protectedfinal PartitionSplitInfo<LongWritable> partitionInfo;
54/** Giraph configuration */55protectedfinal ImmutableClassesGiraphConfiguration<LongWritable, ?, ?>
56 config;
5758/**59 * Constructor60 *61 * @param messageValueFactory Factory for creating message values62 * @param partitionInfo Partition split info63 * @param config Hadoop configuration64 */65publicLongAbstractStore(
66 MessageValueFactory<M> messageValueFactory,
67 PartitionSplitInfo<LongWritable> partitionInfo,
68 ImmutableClassesGiraphConfiguration<LongWritable, Writable, Writable>
69 config) {
70this.messageValueFactory = messageValueFactory;
71this.partitionInfo = partitionInfo;
72this.config = config;
7374 map = new Int2ObjectOpenHashMap<>();
75for (int partitionId : partitionInfo.getPartitionIds()) {
76 Long2ObjectOpenHashMap<T> partitionMap = new Long2ObjectOpenHashMap<T>(
77 (int) partitionInfo.getPartitionVertexCount(partitionId));
78 map.put(partitionId, partitionMap);
79 }
80 }
8182/**83 * Get map which holds messages for partition which vertex belongs to.84 *85 * @param vertexId Id of the vertex86 * @return Map which holds messages for partition which vertex belongs to.87 */88protected Long2ObjectOpenHashMap<T> getPartitionMap(
89 LongWritable vertexId) {
90return map.get(partitionInfo.getPartitionId(vertexId));
91 }
9293 @Override
94publicvoid clearPartition(int partitionId) {
95 map.get(partitionId).clear();
96 }
9798 @Override
99publicboolean hasMessagesForVertex(LongWritable vertexId) {
100return getPartitionMap(vertexId).containsKey(vertexId.get());
101 }
102103 @Override
104publicboolean hasMessagesForPartition(int partitionId) {
105 Long2ObjectOpenHashMap<T> partitionMessages = map.get(partitionId);
106return partitionMessages != null && !partitionMessages.isEmpty();
107 }
108109 @Override
110publicvoid clearVertexMessages(LongWritable vertexId) {
111 getPartitionMap(vertexId).remove(vertexId.get());
112 }
113114115 @Override
116publicvoid clearAll() {
117 map.clear();
118 }
119120 @Override
121public Iterable<LongWritable> getPartitionDestinationVertices(
122int partitionId) {
123 Long2ObjectOpenHashMap<T> partitionMap =
124 map.get(partitionId);
125 List<LongWritable> vertices =
126 Lists.newArrayListWithCapacity(partitionMap.size());
127 LongIterator iterator = partitionMap.keySet().iterator();
128while (iterator.hasNext()) {
129 vertices.add(new LongWritable(iterator.nextLong()));
130 }
131return vertices;
132 }
133 }