This project has retired. For details please refer to its
        
        Attic page.
      
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.giraph.comm.messages;
20  
21  import java.io.DataInput;
22  import java.io.DataOutput;
23  import java.io.IOException;
24  import java.util.Collections;
25  import java.util.concurrent.ConcurrentMap;
26  
27  import org.apache.giraph.bsp.CentralizedServiceWorker;
28  import org.apache.giraph.combiner.MessageCombiner;
29  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
30  import org.apache.giraph.conf.MessageClasses;
31  import org.apache.giraph.factories.MessageValueFactory;
32  import org.apache.giraph.utils.VertexIdMessageIterator;
33  import org.apache.giraph.utils.VertexIdMessages;
34  import org.apache.giraph.utils.WritableUtils;
35  import org.apache.hadoop.io.Writable;
36  import org.apache.hadoop.io.WritableComparable;
37  
38  
39  
40  
41  
42  
43  
44  
45  
46  public class OneMessagePerVertexStore<I extends WritableComparable,
47      M extends Writable> extends SimpleMessageStore<I, M, M> {
48    
49    private final MessageCombiner<? super I, M> messageCombiner;
50  
51    
52  
53  
54  
55  
56  
57    public OneMessagePerVertexStore(
58      MessageValueFactory<M> messageValueFactory,
59      PartitionSplitInfo<I> partitionInfo,
60      MessageCombiner<? super I, M> messageCombiner,
61      ImmutableClassesGiraphConfiguration<I, ?, ?> config
62    ) {
63      super(messageValueFactory, partitionInfo, config);
64      this.messageCombiner =
65          messageCombiner;
66    }
67  
68    @Override
69    public boolean isPointerListEncoding() {
70      return false;
71    }
72  
73    @Override
74    public void addPartitionMessages(
75        int partitionId,
76        VertexIdMessages<I, M> messages) {
77      ConcurrentMap<I, M> partitionMap =
78          getOrCreatePartitionMap(partitionId);
79      VertexIdMessageIterator<I, M> vertexIdMessageIterator =
80        messages.getVertexIdMessageIterator();
81      
82      
83      while (vertexIdMessageIterator.hasNext()) {
84        vertexIdMessageIterator.next();
85        I vertexId = vertexIdMessageIterator.getCurrentVertexId();
86        M currentMessage =
87            partitionMap.get(vertexIdMessageIterator.getCurrentVertexId());
88        if (currentMessage == null) {
89          M newMessage = messageCombiner.createInitialMessage();
90          currentMessage = partitionMap.putIfAbsent(
91              vertexIdMessageIterator.releaseCurrentVertexId(), newMessage);
92          if (currentMessage == null) {
93            currentMessage = newMessage;
94          }
95        }
96        synchronized (currentMessage) {
97          messageCombiner.combine(vertexId, currentMessage,
98              vertexIdMessageIterator.getCurrentMessage());
99        }
100     }
101   }
102 
103   @Override
104   public void addMessage(I vertexId, M message) throws IOException {
105     ConcurrentMap<I, M> partitionMap =
106       getOrCreatePartitionMap(getPartitionId(vertexId));
107     M currentMessage = partitionMap.get(vertexId);
108     if (currentMessage == null) {
109       M newMessage = messageCombiner.createInitialMessage();
110       
111       I copyId = WritableUtils.createCopy(vertexId);
112       currentMessage = partitionMap.putIfAbsent(copyId, newMessage);
113       if (currentMessage == null) {
114         currentMessage = newMessage;
115       }
116     }
117     synchronized (currentMessage) {
118       messageCombiner.combine(vertexId, currentMessage, message);
119     }
120   }
121 
122   @Override
123   protected Iterable<M> getMessagesAsIterable(M message) {
124     return Collections.singleton(message);
125   }
126 
127   @Override
128   protected int getNumberOfMessagesIn(ConcurrentMap<I, M> partitionMap) {
129     return partitionMap.size();
130   }
131 
132   @Override
133   protected void writeMessages(M messages, DataOutput out) throws IOException {
134     messages.write(out);
135   }
136 
137   @Override
138   protected M readFieldsForMessages(DataInput in) throws IOException {
139     M message = messageValueFactory.newInstance();
140     message.readFields(in);
141     return message;
142   }
143 
144 
145   
146 
147 
148 
149 
150 
151 
152 
153 
154   public static <I extends WritableComparable, M extends Writable>
155   MessageStoreFactory<I, M, MessageStore<I, M>> newFactory(
156       CentralizedServiceWorker<I, ?, ?> service,
157       ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
158     return new Factory<I, M>(service, config);
159   }
160 
161   
162 
163 
164 
165 
166 
167   private static class Factory<I extends WritableComparable,
168       M extends Writable>
169       implements MessageStoreFactory<I, M, MessageStore<I, M>> {
170     
171     private PartitionSplitInfo<I> partitionInfo;
172     
173     private ImmutableClassesGiraphConfiguration<I, ?, ?> config;
174 
175     
176 
177 
178 
179     public Factory(
180       PartitionSplitInfo<I> partitionInfo,
181       ImmutableClassesGiraphConfiguration<I, ?, ?> config
182     ) {
183       this.partitionInfo = partitionInfo;
184       this.config = config;
185     }
186 
187     @Override
188     public MessageStore<I, M> newStore(
189         MessageClasses<I, M> messageClasses) {
190       return new OneMessagePerVertexStore<I, M>(
191           messageClasses.createMessageValueFactory(config), partitionInfo,
192           messageClasses.createMessageCombiner(config), config);
193     }
194 
195     @Override
196     public void initialize(PartitionSplitInfo<I> partitionInfo,
197         ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
198       this.partitionInfo = partitionInfo;
199       this.config = conf;
200     }
201   }
202 }