This project has retired. For details please refer to its Attic page.
LongPointerListPerVertexStore xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.messages.primitives.long_id;
20  
21  import it.unimi.dsi.fastutil.longs.LongArrayList;
22  
23  import java.io.DataInput;
24  import java.io.DataOutput;
25  import java.io.IOException;
26  
27  import org.apache.giraph.comm.messages.MessageStore;
28  import org.apache.giraph.comm.messages.PartitionSplitInfo;
29  import org.apache.giraph.comm.messages.PointerListMessagesIterable;
30  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
31  import org.apache.giraph.factories.MessageValueFactory;
32  import org.apache.giraph.utils.EmptyIterable;
33  import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer;
34  import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer.IndexAndDataOut;
35  import org.apache.giraph.utils.ExtendedDataOutput;
36  import org.apache.giraph.utils.VertexIdMessageIterator;
37  import org.apache.giraph.utils.VertexIdMessages;
38  import org.apache.hadoop.io.LongWritable;
39  import org.apache.hadoop.io.Writable;
40  
41  /**
42   * This stores messages in
43   * {@link org.apache.giraph.utils.ExtendedByteArrayOutputBuffer}
44   * and stores long pointers that point to serialized messages
45   *
46   * @param <M> message type
47   */
48  public class LongPointerListPerVertexStore<M extends Writable>
49    extends LongAbstractListStore<M, LongArrayList>
50    implements MessageStore<LongWritable, M> {
51  
52    /** Buffers of byte array outputs used to store messages - thread safe */
53    private final ExtendedByteArrayOutputBuffer bytesBuffer;
54  
55    /**
56     * Constructor
57     *
58     * @param messageValueFactory Factory for creating message values
59     * @param partitionInfo       Partition split info
60     * @param config              Hadoop configuration
61     */
62    public LongPointerListPerVertexStore(
63      MessageValueFactory<M> messageValueFactory,
64      PartitionSplitInfo<LongWritable> partitionInfo,
65      ImmutableClassesGiraphConfiguration<LongWritable,
66      Writable, Writable> config) {
67      super(messageValueFactory, partitionInfo, config);
68      bytesBuffer = new ExtendedByteArrayOutputBuffer(config);
69    }
70  
71    @Override
72    public boolean isPointerListEncoding() {
73      return true;
74    }
75  
76    @Override
77    protected LongArrayList createList() {
78      return new LongArrayList();
79    }
80  
81    @Override
82    public void addPartitionMessages(
83      int partitionId,
84      VertexIdMessages<LongWritable, M> messages
85    ) {
86      try {
87        VertexIdMessageIterator<LongWritable, M> iterator =
88            messages.getVertexIdMessageIterator();
89        long pointer = 0;
90        LongArrayList list;
91        while (iterator.hasNext()) {
92          iterator.next();
93          M msg = iterator.getCurrentMessage();
94          list = getList(iterator.getCurrentVertexId());
95  
96          if (iterator.isNewMessage()) {
97            IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
98            pointer = indexAndDataOut.getIndex();
99            pointer <<= 32;
100           ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
101           pointer += dataOutput.getPos();
102           msg.write(dataOutput);
103         }
104         synchronized (list) { // TODO - any better way?
105           list.add(pointer);
106         }
107       }
108     } catch (IOException e) {
109       throw new RuntimeException("addPartitionMessages: IOException while" +
110           " adding messages for a partition: " + e);
111     }
112   }
113 
114   @Override
115   public void addMessage(LongWritable vertexId, M message) throws IOException {
116     LongArrayList list = getList(vertexId);
117     IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
118     long pointer = indexAndDataOut.getIndex();
119     pointer <<= 32;
120     ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
121     pointer += dataOutput.getPos();
122     message.write(dataOutput);
123 
124     synchronized (list) {
125       list.add(pointer);
126     }
127   }
128 
129   @Override
130   public Iterable<M> getVertexMessages(LongWritable vertexId) {
131     LongArrayList list = getPartitionMap(vertexId).get(
132         vertexId.get());
133     if (list == null) {
134       return EmptyIterable.get();
135     } else {
136       return new PointerListMessagesIterable<>(messageValueFactory,
137         list, bytesBuffer);
138     }
139   }
140 
141   // FIXME -- complete these for check-pointing
142   @Override
143   public void writePartition(DataOutput out, int partitionId)
144     throws IOException {
145   }
146 
147   @Override
148   public void readFieldsForPartition(DataInput in, int partitionId)
149     throws IOException {
150   }
151 }