This project has retired. For details please refer to its Attic page.
InternalMessageStore xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.api.local;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.Iterator;
23  import java.util.List;
24  import java.util.concurrent.ThreadLocalRandom;
25  
26  import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory;
27  import org.apache.giraph.comm.messages.MessageStore;
28  import org.apache.giraph.comm.messages.PartitionSplitInfo;
29  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
30  import org.apache.giraph.conf.MessageClasses;
31  import org.apache.giraph.factories.MessageValueFactory;
32  import org.apache.giraph.utils.WritableUtils;
33  import org.apache.hadoop.io.Writable;
34  import org.apache.hadoop.io.WritableComparable;
35  
36  import com.google.common.collect.Iterators;
37  
38  /**
39   * Interface for internal message store, used by LocalBlockRunner
40   *
41   * @param <I> Vertex id type
42   * @param <M> Message type
43   */
44  @SuppressWarnings("rawtypes")
45  interface InternalMessageStore
46      <I extends WritableComparable, M extends Writable> {
47    Iterator<I> targetVertexIds();
48    boolean hasMessage(I id);
49    Iterable<M> takeMessages(I id);
50    void sendMessage(I id, M message);
51    void sendMessageToMultipleEdges(Iterator<I> idIter, M message);
52    void finalizeStore();
53    Iterable<I> getPartitionDestinationVertices(int partitionId);
54  
55    /**
56     * A wrapper that uses InMemoryMessageStoreFactory to
57     * create MessageStore
58     *
59     * @param <I> Vertex id type
60     * @param <M> Message type
61     */
62    class InternalWrappedMessageStore
63    <I extends WritableComparable, M extends Writable>
64    implements InternalMessageStore<I, M> {
65      private final MessageStore<I, M> messageStore;
66      private final PartitionSplitInfo<I> partitionInfo;
67  
68      private InternalWrappedMessageStore(
69        ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
70        MessageStore<I, M> messageStore,
71        PartitionSplitInfo<I> partitionInfo
72      ) {
73        this.messageStore = messageStore;
74        this.partitionInfo = partitionInfo;
75      }
76  
77      public static <I extends WritableComparable, M extends Writable>
78      InternalMessageStore<I, M> create(
79        ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
80        MessageClasses<I, M> messageClasses,
81        PartitionSplitInfo<I> partitionInfo
82      ) {
83        InMemoryMessageStoreFactory<I, M> factory =
84          new InMemoryMessageStoreFactory<>();
85        factory.initialize(partitionInfo, conf);
86        return new InternalWrappedMessageStore<>(
87          conf,
88          factory.newStore(messageClasses),
89          partitionInfo
90        );
91      }
92  
93      @Override
94      public void sendMessage(I id, M message) {
95        try {
96          messageStore.addMessage(id, message);
97        } catch (IOException e) {
98          throw new RuntimeException(e);
99        }
100     }
101 
102     @Override
103     public void sendMessageToMultipleEdges(Iterator<I> idIter, M message) {
104       while (idIter.hasNext()) {
105         sendMessage(idIter.next(), message);
106       }
107     }
108 
109     @Override
110     public Iterable<M> takeMessages(I id) {
111       Iterable<M> result = messageStore.getVertexMessages(id);
112       messageStore.clearVertexMessages(id);
113       return result;
114     }
115 
116     @Override
117     public Iterable<I> getPartitionDestinationVertices(int partitionId) {
118       return messageStore.getPartitionDestinationVertices(partitionId);
119     }
120 
121     @Override
122     public Iterator<I> targetVertexIds() {
123       List<Iterator<I>> iterators = new ArrayList<>();
124       for (int partition : partitionInfo.getPartitionIds()) {
125         Iterable<I> vertices =
126           messageStore.getPartitionDestinationVertices(partition);
127         iterators.add(vertices.iterator());
128       }
129       return Iterators.concat(iterators.iterator());
130     }
131 
132     @Override
133     public boolean hasMessage(I id) {
134       return messageStore.hasMessagesForVertex(id);
135     }
136 
137     @Override
138     public void finalizeStore() {
139       messageStore.finalizeStore();
140     }
141   }
142 
143   /**
144    * Message store that add checks for whether serialization seems to be
145    * working fine
146    */
147   static class InternalChecksMessageStore
148       <I extends WritableComparable, M extends Writable>
149       implements InternalMessageStore<I, M> {
150     private final InternalMessageStore<I, M> messageStore;
151     private final ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
152     private final MessageValueFactory<M> messageFactory;
153 
154     public InternalChecksMessageStore(
155       InternalMessageStore<I, M> messageStore,
156       ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
157       MessageValueFactory<M> messageFactory
158     ) {
159       this.messageStore = messageStore;
160       this.conf = conf;
161       this.messageFactory = messageFactory;
162     }
163 
164     // Use message copies probabilistically, to catch both not serializing some
165     // fields, and storing references from message object itself
166     // (which can be reusable).
167     private M maybeMessageCopy(M message) {
168       M messageCopy = WritableUtils.createCopy(
169           message, messageFactory, conf);
170       return ThreadLocalRandom.current().nextBoolean() ? messageCopy : message;
171     }
172 
173     private void checkIdCopy(I id) {
174       WritableUtils.createCopy(id, conf.getVertexIdFactory(), conf);
175     }
176 
177     @Override
178     public void sendMessage(I id, M message) {
179       checkIdCopy(id);
180       messageStore.sendMessage(id, maybeMessageCopy(message));
181     }
182 
183     @Override
184     public void sendMessageToMultipleEdges(
185         final Iterator<I> idIter, M message) {
186       messageStore.sendMessageToMultipleEdges(
187           new Iterator<I>() {
188             @Override
189             public boolean hasNext() {
190               return idIter.hasNext();
191             }
192 
193             @Override
194             public I next() {
195               I id = idIter.next();
196               checkIdCopy(id);
197               return id;
198             }
199 
200             @Override
201             public void remove() {
202               idIter.remove();
203             }
204           },
205           maybeMessageCopy(message));
206     }
207 
208     @Override
209     public Iterable<M> takeMessages(I id) {
210       checkIdCopy(id);
211       return messageStore.takeMessages(id);
212     }
213 
214     @Override
215     public boolean hasMessage(I id) {
216       return messageStore.hasMessage(id);
217     }
218 
219     @Override
220     public Iterator<I> targetVertexIds() {
221       return messageStore.targetVertexIds();
222     }
223 
224     @Override
225     public Iterable<I> getPartitionDestinationVertices(int partitionId) {
226       return messageStore.getPartitionDestinationVertices(partitionId);
227     }
228 
229     @Override
230     public void finalizeStore() {
231       messageStore.finalizeStore();
232     }
233   }
234 }