View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.block_app.framework.api.local;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.Collections;
23  import java.util.Iterator;
24  import java.util.List;
25  import java.util.Set;
26  import java.util.concurrent.ConcurrentHashMap;
27  import java.util.concurrent.ThreadLocalRandom;
28  
29  import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces;
30  import org.apache.giraph.combiner.MessageCombiner;
31  import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
32  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
33  import org.apache.giraph.conf.MessageClasses;
34  import org.apache.giraph.factories.MessageValueFactory;
35  import org.apache.giraph.types.ops.TypeOps;
36  import org.apache.giraph.types.ops.TypeOpsUtils;
37  import org.apache.giraph.utils.ExtendedDataInput;
38  import org.apache.giraph.utils.ExtendedDataOutput;
39  import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
40  import org.apache.giraph.utils.WritableUtils;
41  import org.apache.hadoop.io.Writable;
42  import org.apache.hadoop.io.WritableComparable;
43  
44  import com.google.common.collect.AbstractIterator;
45  
46  /**
47   * Interface for internal message store, used by LocalBlockRunner
48   *
49   * @param <I> Vertex id type
50   * @param <M> Message type
51   */
52  @SuppressWarnings("rawtypes")
53  interface InternalMessageStore
54      <I extends WritableComparable, M extends Writable> {
55    Set<I> targetsSet();
56    Iterable<M> takeMessages(I id);
57    void sendMessage(I id, M message);
58    void sendMessageToMultipleEdges(Iterator<I> idIter, M message);
59  
60    /**
61     * Abstract Internal message store implementation that uses
62     * ConcurrentHashMap to store objects received thus far.
63     *
64     * @param <I> Vertex id type
65     * @param <M> Message type
66     * @param <R> Receiver object that particular implementation uses
67     *            (message, array of messages, byte array, etc)
68     */
69    abstract class InternalConcurrentMessageStore
70        <I extends WritableComparable, M extends Writable, R>
71        implements InternalMessageStore<I, M> {
72      private final ConcurrentHashMap<I, R> received =
73          new ConcurrentHashMap<>();
74  
75      private final Class<I> idClass;
76      private final TypeOps<I> idTypeOps;
77  
78      InternalConcurrentMessageStore(Class<I> idClass) {
79        this.idClass = idClass;
80        idTypeOps = TypeOpsUtils.getTypeOpsOrNull(idClass);
81      }
82  
83      public I copyId(I id) {
84        if (idTypeOps != null) {
85          return idTypeOps.createCopy(id);
86        } else {
87          return WritableUtils.createCopy(id, idClass, null);
88        }
89      }
90  
91      R getReceiverFor(I id) {
92        R value = received.get(id);
93  
94        if (value == null) {
95          id = copyId(id);
96          value = createNewReceiver();
97          R oldValue = received.putIfAbsent(id, value);
98          if (oldValue != null) {
99            value = oldValue;
100         }
101       }
102       return value;
103     }
104 
105     R removeFor(I id) {
106       return received.remove(id);
107     }
108 
109     abstract R createNewReceiver();
110 
111     @Override
112     public Set<I> targetsSet() {
113       return received.keySet();
114     }
115 
116     @Override
117     public void sendMessageToMultipleEdges(Iterator<I> idIter, M message) {
118       while (idIter.hasNext()) {
119         sendMessage(idIter.next(), message);
120       }
121     }
122 
123     public static <I extends WritableComparable, M extends Writable>
124     InternalMessageStore<I, M> createMessageStore(
125       final ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
126       final MessageClasses<I, M> messageClasses
127     ) {
128       MessageCombiner<? super I, M> combiner =
129           messageClasses.createMessageCombiner(conf);
130       if (combiner != null) {
131         return new InternalCombinerMessageStore<>(
132             conf.getVertexIdClass(), combiner);
133       } else if (messageClasses.getMessageEncodeAndStoreType().equals(
134           MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) {
135         return new InternalSharedByteMessageStore<>(
136             conf.getVertexIdClass(),
137             messageClasses.createMessageValueFactory(conf));
138       } else {
139         return new InternalByteMessageStore<>(
140           conf.getVertexIdClass(),
141           messageClasses.createMessageValueFactory(conf),
142           conf);
143       }
144     }
145 
146     public static <I extends WritableComparable, M extends Writable>
147     InternalMessageStore<I, M> createMessageStore(
148         final ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
149         final BlockWorkerPieces pieces, boolean runAllChecks) {
150       @SuppressWarnings("unchecked")
151       MessageClasses<I, M> messageClasses =
152           pieces.getOutgoingMessageClasses(conf);
153 
154       InternalMessageStore<I, M> messageStore =
155           createMessageStore(conf, messageClasses);
156       if (runAllChecks) {
157         return new InternalChecksMessageStore<I, M>(
158             messageStore, conf, messageClasses.createMessageValueFactory(conf));
159       } else {
160         return messageStore;
161       }
162     }
163   }
164 
165   /**
166    * InternalMessageStore that combines messages as they are received.
167    *
168    * @param <I> Vertex id value type
169    * @param <M> Message type
170    */
171   static class InternalCombinerMessageStore
172       <I extends WritableComparable, M extends Writable>
173       extends InternalConcurrentMessageStore<I, M, M> {
174     private final MessageCombiner<? super I, M> messageCombiner;
175 
176     public InternalCombinerMessageStore(Class<I> idClass,
177         MessageCombiner<? super I, M> messageCombiner) {
178       super(idClass);
179       this.messageCombiner = messageCombiner;
180     }
181 
182     @Override
183     public Iterable<M> takeMessages(I id) {
184       M message = removeFor(id);
185       if (message != null) {
186         return Collections.singleton(message);
187       } else {
188         return null;
189       }
190     }
191 
192     @Override
193     public void sendMessage(I id, M message) {
194       M mainMessage = getReceiverFor(id);
195       synchronized (mainMessage) {
196         messageCombiner.combine(id, mainMessage, message);
197       }
198     }
199 
200     @Override
201     M createNewReceiver() {
202       return messageCombiner.createInitialMessage();
203     }
204   }
205 
206   /**
207    * InternalMessageStore that keeps messages for each vertex in byte array.
208    *
209    * @param <I> Vertex id value type
210    * @param <M> Message type
211    */
212   static class InternalByteMessageStore
213       <I extends WritableComparable, M extends Writable>
214       extends InternalConcurrentMessageStore<I, M,
215           ExtendedDataOutput> {
216     private final MessageValueFactory<M> messageFactory;
217     private final ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
218 
219     public InternalByteMessageStore(
220       Class<I> idClass, MessageValueFactory<M> messageFactory,
221       ImmutableClassesGiraphConfiguration<I, ?, ?> conf
222     ) {
223       super(idClass);
224       this.messageFactory = messageFactory;
225       this.conf = conf;
226     }
227 
228     @Override
229     public Iterable<M> takeMessages(I id) {
230       final ExtendedDataOutput out = removeFor(id);
231       if (out == null) {
232         return null;
233       }
234 
235       return new Iterable<M>() {
236         @Override
237         public Iterator<M> iterator() {
238           final ExtendedDataInput in = conf.createExtendedDataInput(
239             out.getByteArray(), 0, out.getPos()
240           );
241 
242           final M message = messageFactory.newInstance();
243           return new AbstractIterator<M>() {
244             @Override
245             protected M computeNext() {
246               if (in.available() == 0) {
247                 return endOfData();
248               }
249               try {
250                 message.readFields(in);
251               } catch (IOException e) {
252                 throw new RuntimeException(e);
253               }
254               return message;
255             }
256           };
257         }
258       };
259     }
260 
261     @Override
262     public void sendMessage(I id, M message) {
263       ExtendedDataOutput out = getReceiverFor(id);
264 
265       synchronized (out) {
266         try {
267           message.write(out);
268         } catch (IOException e) {
269           throw new RuntimeException(e);
270         }
271       }
272     }
273 
274     @Override
275     ExtendedDataOutput createNewReceiver() {
276       return conf.createExtendedDataOutput();
277     }
278   }
279 
280   /**
281    * InternalMessageStore that creates byte[] for each message, and
282    * all receivers share the same byte[].
283    *
284    * @param <I> Vertex id value type
285    * @param <M> Message type
286    */
287   static class InternalSharedByteMessageStore
288       <I extends WritableComparable, M extends Writable>
289       extends InternalConcurrentMessageStore<I, M, List<byte[]>> {
290     private final MessageValueFactory<M> messageFactory;
291 
292     public InternalSharedByteMessageStore(
293         Class<I> idClass, MessageValueFactory<M> messageFactory) {
294       super(idClass);
295       this.messageFactory = messageFactory;
296     }
297 
298     @Override
299     public Iterable<M> takeMessages(I id) {
300       final List<byte[]> out = removeFor(id);
301       if (out == null) {
302         return null;
303       }
304 
305       return new Iterable<M>() {
306         @Override
307         public Iterator<M> iterator() {
308           final Iterator<byte[]> byteIter = out.iterator();
309           final M message = messageFactory.newInstance();
310           final UnsafeReusableByteArrayInput reusableInput =
311               new UnsafeReusableByteArrayInput();
312 
313           return new Iterator<M>() {
314             @Override
315             public boolean hasNext() {
316               return byteIter.hasNext();
317             }
318 
319             @Override
320             public M next() {
321               WritableUtils.fromByteArrayUnsafe(
322                   byteIter.next(), message, reusableInput);
323               return message;
324             }
325 
326             @Override
327             public void remove() {
328               byteIter.remove();
329             }
330           };
331         }
332       };
333     }
334 
335     private void storeMessage(I id, byte[] messageData) {
336       List<byte[]> out = getReceiverFor(id);
337       synchronized (out) {
338         out.add(messageData);
339       }
340     }
341 
342     @Override
343     List<byte[]> createNewReceiver() {
344       return new ArrayList<>();
345     }
346 
347     @Override
348     public void sendMessage(I id, M message) {
349       storeMessage(id, WritableUtils.toByteArrayUnsafe(message));
350     }
351 
352     @Override
353     public void sendMessageToMultipleEdges(Iterator<I> idIter, M message) {
354       byte[] messageData = WritableUtils.toByteArrayUnsafe(message);
355       while (idIter.hasNext()) {
356         storeMessage(idIter.next(), messageData);
357       }
358     }
359   }
360 
361   /**
362    * Message store that add checks for whether serialization seems to be
363    * working fine
364    */
365   static class InternalChecksMessageStore
366       <I extends WritableComparable, M extends Writable>
367       implements InternalMessageStore<I, M> {
368     private final InternalMessageStore<I, M> messageStore;
369     private final ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
370     private final MessageValueFactory<M> messageFactory;
371 
372     public InternalChecksMessageStore(InternalMessageStore<I, M> messageStore,
373         ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
374         MessageValueFactory<M> messageFactory) {
375       this.messageStore = messageStore;
376       this.conf = conf;
377       this.messageFactory = messageFactory;
378     }
379 
380     // Use message copies probabilistically, to catch both not serializing some
381     // fields, and storing references from message object itself
382     // (which can be reusable).
383     private M maybeMessageCopy(M message) {
384       M messageCopy = WritableUtils.createCopy(
385           message, messageFactory, conf);
386       return ThreadLocalRandom.current().nextBoolean() ? messageCopy : message;
387     }
388 
389     private void checkIdCopy(I id) {
390       WritableUtils.createCopy(id, conf.getVertexIdFactory(), conf);
391     }
392 
393     @Override
394     public void sendMessage(I id, M message) {
395       checkIdCopy(id);
396       messageStore.sendMessage(id, maybeMessageCopy(message));
397     }
398 
399     @Override
400     public void sendMessageToMultipleEdges(
401         final Iterator<I> idIter, M message) {
402       messageStore.sendMessageToMultipleEdges(
403           new Iterator<I>() {
404             @Override
405             public boolean hasNext() {
406               return idIter.hasNext();
407             }
408 
409             @Override
410             public I next() {
411               I id = idIter.next();
412               checkIdCopy(id);
413               return id;
414             }
415 
416             @Override
417             public void remove() {
418               idIter.remove();
419             }
420           },
421           maybeMessageCopy(message));
422     }
423 
424     @Override
425     public Iterable<M> takeMessages(I id) {
426       checkIdCopy(id);
427       return messageStore.takeMessages(id);
428     }
429 
430     @Override
431     public Set<I> targetsSet() {
432       return messageStore.targetsSet();
433     }
434   }
435 }