This project has retired. For details please refer to its Attic page.
PointerListPerVertexStore xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.messages;
20  
21  import it.unimi.dsi.fastutil.longs.LongArrayList;
22  
23  import java.io.DataInput;
24  import java.io.DataOutput;
25  import java.io.IOException;
26  import java.util.concurrent.ConcurrentMap;
27  
28  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
29  import org.apache.giraph.factories.MessageValueFactory;
30  import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer;
31  import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer.IndexAndDataOut;
32  import org.apache.giraph.utils.ExtendedDataOutput;
33  import org.apache.giraph.utils.VertexIdMessageIterator;
34  import org.apache.giraph.utils.VertexIdMessages;
35  import org.apache.hadoop.io.Writable;
36  import org.apache.hadoop.io.WritableComparable;
37  
38  /**
39   * Implementation of {@link SimpleMessageStore} where multiple messages are
40   * stored as a list of long pointers to extended data output objects
41   * Used when there is no combiner provided.
42   *
43   * @param <I> vertexId type
44   * @param <M> message type
45   */
46  public class PointerListPerVertexStore<I extends WritableComparable,
47    M extends Writable> extends AbstractListPerVertexStore<I, M, LongArrayList> {
48  
49    /** Buffers of byte array outputs used to store messages - thread safe */
50    private final ExtendedByteArrayOutputBuffer bytesBuffer;
51  
52    /**
53     * Constructor
54     *
55     * @param messageValueFactory Message class held in the store
56     * @param partitionInfo Partition split info
57     * @param config Hadoop configuration
58     */
59    public PointerListPerVertexStore(
60      MessageValueFactory<M> messageValueFactory,
61      PartitionSplitInfo<I> partitionInfo,
62      ImmutableClassesGiraphConfiguration<I, ?, ?> config
63    ) {
64      super(messageValueFactory, partitionInfo, config);
65      bytesBuffer = new ExtendedByteArrayOutputBuffer(config);
66    }
67  
68    @Override
69    public boolean isPointerListEncoding() {
70      return true;
71    }
72  
73    @Override
74    protected LongArrayList createList() {
75      return new LongArrayList();
76    }
77  
78    @Override
79    public void addPartitionMessages(
80      int partitionId, VertexIdMessages<I, M> messages) {
81      try {
82        VertexIdMessageIterator<I, M> vertexIdMessageIterator =
83            messages.getVertexIdMessageIterator();
84        long pointer = 0;
85        LongArrayList list;
86        while (vertexIdMessageIterator.hasNext()) {
87          vertexIdMessageIterator.next();
88          M msg = vertexIdMessageIterator.getCurrentMessage();
89          list = getOrCreateList(vertexIdMessageIterator);
90          if (vertexIdMessageIterator.isNewMessage()) {
91            IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
92            pointer = indexAndDataOut.getIndex();
93            pointer <<= 32;
94            ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
95            pointer += dataOutput.getPos();
96            msg.write(dataOutput);
97          }
98          synchronized (list) {
99            list.add(pointer);
100         }
101       }
102     } catch (IOException e) {
103       throw new RuntimeException("addPartitionMessages: IOException while" +
104           " adding messages for a partition: " + e);
105     }
106   }
107 
108   @Override
109   public void addMessage(I vertexId, M message) throws IOException {
110     LongArrayList list = getOrCreateList(vertexId);
111     IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
112     long pointer = indexAndDataOut.getIndex();
113     pointer <<= 32;
114     ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
115     pointer += dataOutput.getPos();
116     message.write(dataOutput);
117 
118     synchronized (list) {
119       list.add(pointer);
120     }
121   }
122 
123   /**
124    * Get messages as an iterable from message storage
125    *
126    * @param pointers list of pointers to messages
127    * @return Messages as an iterable
128    */
129   @Override
130   public Iterable<M> getMessagesAsIterable(LongArrayList pointers) {
131     return new PointerListMessagesIterable<>(messageValueFactory, pointers,
132       bytesBuffer);
133   }
134 
135   @Override
136   protected int getNumberOfMessagesIn(ConcurrentMap<I,
137     LongArrayList> partitionMap) {
138     int numberOfMessages = 0;
139     for (LongArrayList list : partitionMap.values()) {
140       numberOfMessages += list.size();
141     }
142     return numberOfMessages;
143   }
144 
145   // FIXME -- complete these for check-pointing
146   @Override
147   protected void writeMessages(LongArrayList messages, DataOutput out)
148     throws IOException {
149 
150   }
151 
152   @Override
153   protected LongArrayList readFieldsForMessages(DataInput in)
154     throws IOException {
155     return null;
156   }
157 }