This project has retired. For details please refer to its
        
        Attic page.
      
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.giraph.comm.messages.primitives;
19  
20  import com.google.common.collect.Lists;
21  
22  import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
23  
24  import java.io.DataInput;
25  import java.io.DataOutput;
26  import java.io.IOException;
27  import java.util.Collections;
28  import java.util.Iterator;
29  import java.util.List;
30  
31  import org.apache.giraph.combiner.MessageCombiner;
32  import org.apache.giraph.comm.messages.MessageStore;
33  import org.apache.giraph.comm.messages.PartitionSplitInfo;
34  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
35  import org.apache.giraph.factories.MessageValueFactory;
36  import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
37  import org.apache.giraph.types.ops.TypeOpsUtils;
38  import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
39  import org.apache.giraph.types.ops.collections.WritableWriter;
40  import org.apache.giraph.utils.EmptyIterable;
41  import org.apache.giraph.utils.VertexIdMessageIterator;
42  import org.apache.giraph.utils.VertexIdMessages;
43  import org.apache.hadoop.io.Writable;
44  import org.apache.hadoop.io.WritableComparable;
45  
46  
47  
48  
49  
50  
51  
52  
53  
54  
55  
56  
57  public class IdOneMessagePerVertexStore<I extends WritableComparable,
58      M extends Writable> implements MessageStore<I, M> {
59    
60    private final Int2ObjectOpenHashMap<Basic2ObjectMap<I, M>> map;
61    
62    private final MessageValueFactory<M> messageValueFactory;
63    
64    private final MessageCombiner<? super I, M> messageCombiner;
65    
66    private final PartitionSplitInfo<I> partitionInfo;
67    
68    private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
69    
70    private final PrimitiveIdTypeOps<I> idTypeOps;
71    
72    private final WritableWriter<M> messageWriter = new WritableWriter<M>() {
73      @Override
74      public M readFields(DataInput in) throws IOException {
75        M message = messageValueFactory.newInstance();
76        message.readFields(in);
77        return message;
78      }
79  
80      @Override
81      public void write(DataOutput out, M value) throws IOException {
82        value.write(out);
83      }
84    };
85  
86    
87  
88  
89  
90  
91  
92  
93  
94    public IdOneMessagePerVertexStore(
95        MessageValueFactory<M> messageValueFactory,
96        PartitionSplitInfo<I> partitionInfo,
97        MessageCombiner<? super I, M> messageCombiner,
98        ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
99      this.partitionInfo = partitionInfo;
100     this.config = config;
101     this.messageValueFactory = messageValueFactory;
102     this.messageCombiner = messageCombiner;
103 
104     idTypeOps = TypeOpsUtils.getPrimitiveIdTypeOps(config.getVertexIdClass());
105 
106     map = new Int2ObjectOpenHashMap<>();
107     for (int partitionId : partitionInfo.getPartitionIds()) {
108       Basic2ObjectMap<I, M> partitionMap = idTypeOps.create2ObjectOpenHashMap(
109         Math.max(10, (int) partitionInfo.getPartitionVertexCount(partitionId)),
110         messageWriter
111       );
112       map.put(partitionId, partitionMap);
113     }
114   }
115 
116   
117 
118 
119 
120 
121 
122   private Basic2ObjectMap<I, M> getPartitionMap(I vertexId) {
123     return map.get(partitionInfo.getPartitionId(vertexId));
124   }
125 
126   @Override
127   public void addPartitionMessages(
128       int partitionId,
129       VertexIdMessages<I, M> messages) {
130     Basic2ObjectMap<I, M> partitionMap = map.get(partitionId);
131     synchronized (partitionMap) {
132       VertexIdMessageIterator<I, M>
133           iterator = messages.getVertexIdMessageIterator();
134       
135       
136       while (iterator.hasNext()) {
137         iterator.next();
138         I vertexId = iterator.getCurrentVertexId();
139         M currentMessage =
140             partitionMap.get(iterator.getCurrentVertexId());
141         if (currentMessage == null) {
142           M newMessage = messageCombiner.createInitialMessage();
143           currentMessage = partitionMap.put(
144               iterator.getCurrentVertexId(), newMessage);
145           if (currentMessage == null) {
146             currentMessage = newMessage;
147           }
148         }
149         messageCombiner.combine(vertexId, currentMessage,
150           iterator.getCurrentMessage());
151       }
152     }
153   }
154 
155   
156 
157 
158 
159 
160 
161 
162   @Override
163   public void addMessage(I vertexId, M message) throws IOException {
164     Basic2ObjectMap<I, M> partitionMap = getPartitionMap(vertexId);
165     synchronized (partitionMap) {
166       M currentMessage = partitionMap.get(vertexId);
167       if (currentMessage == null) {
168         M newMessage = messageCombiner.createInitialMessage();
169         currentMessage = partitionMap.put(vertexId, newMessage);
170         if (currentMessage == null) {
171           currentMessage = newMessage;
172         }
173       }
174       messageCombiner.combine(vertexId, currentMessage, message);
175     }
176   }
177 
178   @Override
179   public void clearPartition(int partitionId) {
180     map.get(partitionId).clear();
181   }
182 
183   @Override
184   public boolean hasMessagesForVertex(I vertexId) {
185     return getPartitionMap(vertexId).containsKey(vertexId);
186   }
187 
188   @Override
189   public boolean hasMessagesForPartition(int partitionId) {
190     Basic2ObjectMap<I, M> partitionMessages = map.get(partitionId);
191     return partitionMessages != null && partitionMessages.size() != 0;
192   }
193 
194   @Override
195   public Iterable<M> getVertexMessages(I vertexId) {
196     Basic2ObjectMap<I, M> partitionMap = getPartitionMap(vertexId);
197     if (!partitionMap.containsKey(vertexId)) {
198       return EmptyIterable.get();
199     } else {
200       return Collections.singleton(partitionMap.get(vertexId));
201     }
202   }
203 
204   @Override
205   public void clearVertexMessages(I vertexId) {
206     getPartitionMap(vertexId).remove(vertexId);
207   }
208 
209   @Override
210   public void clearAll() {
211     map.clear();
212   }
213 
214   @Override
215   public Iterable<I> getPartitionDestinationVertices(
216       int partitionId) {
217     Basic2ObjectMap<I, M> partitionMap = map.get(partitionId);
218     List<I> vertices =
219         Lists.newArrayListWithCapacity(partitionMap.size());
220     Iterator<I> iterator = partitionMap.fastKeyIterator();
221     while (iterator.hasNext()) {
222       vertices.add(idTypeOps.createCopy(iterator.next()));
223     }
224     return vertices;
225   }
226 
227   @Override
228   public void writePartition(DataOutput out,
229       int partitionId) throws IOException {
230     Basic2ObjectMap<I, M> partitionMap = map.get(partitionId);
231     partitionMap.write(out);
232   }
233 
234   @Override
235   public void readFieldsForPartition(DataInput in,
236       int partitionId) throws IOException {
237     Basic2ObjectMap<I, M> partitionMap = idTypeOps.create2ObjectOpenHashMap(
238         messageWriter);
239     partitionMap.readFields(in);
240     synchronized (map) {
241       map.put(partitionId, partitionMap);
242     }
243   }
244 
245   @Override
246   public void finalizeStore() {
247   }
248 
249   @Override
250   public boolean isPointerListEncoding() {
251     return false;
252   }
253 }