This project has retired. For details please refer to its Attic page.
PointerListMessagesIterable xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.messages;
20  
21  import it.unimi.dsi.fastutil.longs.LongArrayList;
22  import it.unimi.dsi.fastutil.longs.LongListIterator;
23  
24  import java.io.IOException;
25  import java.util.Iterator;
26  
27  import org.apache.giraph.factories.MessageValueFactory;
28  import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer;
29  import org.apache.giraph.utils.ExtendedDataOutput;
30  import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
31  import org.apache.hadoop.io.Writable;
32  
33  /**
34   * Create an iterable for messages based on a pointer list
35   *
36   * @param <M> messageType
37   */
38  public class PointerListMessagesIterable<M extends Writable>
39    implements Iterable<M> {
40    /** Message class */
41    private final MessageValueFactory<M> messageValueFactory;
42    /** List of pointers to messages in byte array */
43    private final LongArrayList pointers;
44    /** Holds the byte arrays of serialized messages */
45    private final ExtendedByteArrayOutputBuffer msgBuffer;
46    /** Reader to read data from byte buffer */
47    private final UnsafeReusableByteArrayInput messageReader;
48  
49    /**
50     *
51     * @param messageValueFactory message value factory
52     * @param pointers pointers to messages in buffer
53     * @param msgBuffer holds the byte arrays of serialized messages
54     */
55    public PointerListMessagesIterable(MessageValueFactory<M> messageValueFactory,
56      LongArrayList pointers, ExtendedByteArrayOutputBuffer msgBuffer) {
57      this.messageValueFactory = messageValueFactory;
58      this.pointers = pointers;
59      this.msgBuffer = msgBuffer;
60      // TODO - if needed implement same for Safe as well
61      messageReader = new UnsafeReusableByteArrayInput();
62    }
63  
64    /**
65     * Create message from factory
66     *
67     * @return message instance
68     */
69    protected M createMessage() {
70      return messageValueFactory.newInstance();
71    }
72  
73    @Override
74    public Iterator<M> iterator() {
75      return new Iterator<M>() {
76        private final LongListIterator iterator = pointers.iterator();
77        private final M reusableMsg =
78          PointerListMessagesIterable.this.createMessage();
79        @Override
80        public boolean hasNext() {
81          return iterator.hasNext();
82        }
83  
84        @Override
85        public M next() {
86          long pointer = iterator.nextLong();
87          try {
88            int index = (int) (pointer >>> 32);
89            int offset = (int) pointer;
90            ExtendedDataOutput buffer = msgBuffer.getDataOutput(index);
91            messageReader.initialize(buffer.getByteArray(), offset,
92              buffer.getPos());
93            reusableMsg.readFields(messageReader);
94          } catch (IOException e) {
95            throw new IllegalStateException("Got exception : " + e);
96          }
97          return reusableMsg;
98        }
99  
100       @Override
101       public void remove() {
102         throw new UnsupportedOperationException();
103       }
104     };
105   }
106 }