View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.messages.primitives;
20  
21  import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
22  import it.unimi.dsi.fastutil.longs.Long2DoubleMap;
23  import it.unimi.dsi.fastutil.longs.Long2DoubleOpenHashMap;
24  import it.unimi.dsi.fastutil.longs.LongIterator;
25  import it.unimi.dsi.fastutil.objects.ObjectIterator;
26  
27  import java.io.DataInput;
28  import java.io.DataOutput;
29  import java.io.IOException;
30  import java.util.Collections;
31  import java.util.List;
32  
33  import org.apache.giraph.bsp.CentralizedServiceWorker;
34  import org.apache.giraph.combiner.MessageCombiner;
35  import org.apache.giraph.comm.messages.MessageStore;
36  import org.apache.giraph.utils.EmptyIterable;
37  import org.apache.giraph.utils.VertexIdMessageIterator;
38  import org.apache.giraph.utils.VertexIdMessages;
39  import org.apache.hadoop.io.DoubleWritable;
40  import org.apache.hadoop.io.LongWritable;
41  import org.apache.hadoop.io.Writable;
42  
43  import com.google.common.collect.Lists;
44  
45  /**
46   * Special message store to be used when ids are LongWritable and messages
47   * are DoubleWritable and messageCombiner is used.
48   * Uses fastutil primitive maps in order to decrease number of objects and
49   * get better performance.
50   */
51  public class LongDoubleMessageStore
52      implements MessageStore<LongWritable, DoubleWritable> {
53    /** Map from partition id to map from vertex id to message */
54    private final Int2ObjectOpenHashMap<Long2DoubleOpenHashMap> map;
55    /** Message messageCombiner */
56    private final
57    MessageCombiner<? super LongWritable, DoubleWritable> messageCombiner;
58    /** Service worker */
59    private final CentralizedServiceWorker<LongWritable, ?, ?> service;
60  
61    /**
62     * Constructor
63     *
64     * @param service Service worker
65     * @param messageCombiner Message messageCombiner
66     */
67    public LongDoubleMessageStore(
68        CentralizedServiceWorker<LongWritable, Writable, Writable> service,
69        MessageCombiner<? super LongWritable, DoubleWritable> messageCombiner) {
70      this.service = service;
71      this.messageCombiner =
72          messageCombiner;
73  
74      map = new Int2ObjectOpenHashMap<Long2DoubleOpenHashMap>();
75      for (int partitionId : service.getPartitionStore().getPartitionIds()) {
76        Long2DoubleOpenHashMap partitionMap = new Long2DoubleOpenHashMap(
77            (int) service.getPartitionStore()
78                .getPartitionVertexCount(partitionId));
79        map.put(partitionId, partitionMap);
80      }
81    }
82  
83    @Override
84    public boolean isPointerListEncoding() {
85      return false;
86    }
87  
88    /**
89     * Get map which holds messages for partition which vertex belongs to.
90     *
91     * @param vertexId Id of the vertex
92     * @return Map which holds messages for partition which vertex belongs to.
93     */
94    private Long2DoubleOpenHashMap getPartitionMap(LongWritable vertexId) {
95      return map.get(service.getPartitionId(vertexId));
96    }
97  
98    @Override
99    public void addPartitionMessages(int partitionId,
100       VertexIdMessages<LongWritable, DoubleWritable> messages) {
101     LongWritable reusableVertexId = new LongWritable();
102     DoubleWritable reusableMessage = new DoubleWritable();
103     DoubleWritable reusableCurrentMessage = new DoubleWritable();
104 
105     Long2DoubleOpenHashMap partitionMap = map.get(partitionId);
106     synchronized (partitionMap) {
107       VertexIdMessageIterator<LongWritable, DoubleWritable> iterator =
108         messages.getVertexIdMessageIterator();
109       while (iterator.hasNext()) {
110         iterator.next();
111         long vertexId = iterator.getCurrentVertexId().get();
112         double message = iterator.getCurrentMessage().get();
113         if (partitionMap.containsKey(vertexId)) {
114           reusableVertexId.set(vertexId);
115           reusableMessage.set(message);
116           reusableCurrentMessage.set(partitionMap.get(vertexId));
117           messageCombiner.combine(reusableVertexId, reusableCurrentMessage,
118               reusableMessage);
119           message = reusableCurrentMessage.get();
120         }
121         partitionMap.put(vertexId, message);
122       }
123     }
124   }
125 
126   @Override
127   public void finalizeStore() {
128   }
129 
130   @Override
131   public void clearPartition(int partitionId) {
132     map.get(partitionId).clear();
133   }
134 
135   @Override
136   public boolean hasMessagesForVertex(LongWritable vertexId) {
137     return getPartitionMap(vertexId).containsKey(vertexId.get());
138   }
139 
140   @Override
141   public boolean hasMessagesForPartition(int partitionId) {
142     Long2DoubleOpenHashMap partitionMessages = map.get(partitionId);
143     return partitionMessages != null && !partitionMessages.isEmpty();
144   }
145 
146   @Override
147   public Iterable<DoubleWritable> getVertexMessages(
148       LongWritable vertexId) {
149     Long2DoubleOpenHashMap partitionMap = getPartitionMap(vertexId);
150     if (!partitionMap.containsKey(vertexId.get())) {
151       return EmptyIterable.get();
152     } else {
153       return Collections.singleton(
154           new DoubleWritable(partitionMap.get(vertexId.get())));
155     }
156   }
157 
158   @Override
159   public void clearVertexMessages(LongWritable vertexId) {
160     getPartitionMap(vertexId).remove(vertexId.get());
161   }
162 
163   @Override
164   public void clearAll() {
165     map.clear();
166   }
167 
168   @Override
169   public Iterable<LongWritable> getPartitionDestinationVertices(
170       int partitionId) {
171     Long2DoubleOpenHashMap partitionMap = map.get(partitionId);
172     List<LongWritable> vertices =
173         Lists.newArrayListWithCapacity(partitionMap.size());
174     LongIterator iterator = partitionMap.keySet().iterator();
175     while (iterator.hasNext()) {
176       vertices.add(new LongWritable(iterator.nextLong()));
177     }
178     return vertices;
179   }
180 
181   @Override
182   public void writePartition(DataOutput out,
183       int partitionId) throws IOException {
184     Long2DoubleOpenHashMap partitionMap = map.get(partitionId);
185     out.writeInt(partitionMap.size());
186     ObjectIterator<Long2DoubleMap.Entry> iterator =
187         partitionMap.long2DoubleEntrySet().fastIterator();
188     while (iterator.hasNext()) {
189       Long2DoubleMap.Entry entry = iterator.next();
190       out.writeLong(entry.getLongKey());
191       out.writeDouble(entry.getDoubleValue());
192     }
193   }
194 
195   @Override
196   public void readFieldsForPartition(DataInput in,
197       int partitionId) throws IOException {
198     int size = in.readInt();
199     Long2DoubleOpenHashMap partitionMap = new Long2DoubleOpenHashMap(size);
200     while (size-- > 0) {
201       long vertexId = in.readLong();
202       double message = in.readDouble();
203       partitionMap.put(vertexId, message);
204     }
205     synchronized (map) {
206       map.put(partitionId, partitionMap);
207     }
208   }
209 }