View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.messages.primitives.long_id;
20  
21  import it.unimi.dsi.fastutil.longs.LongArrayList;
22  import org.apache.giraph.bsp.CentralizedServiceWorker;
23  import org.apache.giraph.comm.messages.MessageStore;
24  import org.apache.giraph.comm.messages.PointerListMessagesIterable;
25  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
26  import org.apache.giraph.factories.MessageValueFactory;
27  import org.apache.giraph.utils.EmptyIterable;
28  import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer;
29  import org.apache.giraph.utils.ExtendedDataOutput;
30  import org.apache.giraph.utils.VertexIdMessageIterator;
31  import org.apache.giraph.utils.VertexIdMessages;
32  import org.apache.hadoop.io.LongWritable;
33  import org.apache.hadoop.io.Writable;
34  
35  import java.io.DataInput;
36  import java.io.DataOutput;
37  import java.io.IOException;
38  
39  import static org.apache.giraph.utils.ExtendedByteArrayOutputBuffer.IndexAndDataOut;
40  
41  /**
42   * This stores messages in
43   * {@link org.apache.giraph.utils.ExtendedByteArrayOutputBuffer}
44   * and stores long pointers that point to serialized messages
45   *
46   * @param <M> message type
47   */
48  public class LongPointerListMessageStore<M extends Writable>
49    extends LongAbstractListMessageStore<M, LongArrayList>
50    implements MessageStore<LongWritable, M> {
51  
52    /** Buffers of byte array outputs used to store messages - thread safe */
53    private final ExtendedByteArrayOutputBuffer bytesBuffer;
54  
55    /**
56     * Constructor
57     *
58     * @param messageValueFactory Factory for creating message values
59     * @param service             Service worker
60     * @param config              Hadoop configuration
61     */
62    public LongPointerListMessageStore(
63      MessageValueFactory<M> messageValueFactory,
64      CentralizedServiceWorker<LongWritable, Writable, Writable> service,
65      ImmutableClassesGiraphConfiguration<LongWritable,
66      Writable, Writable> config) {
67      super(messageValueFactory, service, config);
68      bytesBuffer = new ExtendedByteArrayOutputBuffer(config);
69    }
70  
71    @Override
72    public boolean isPointerListEncoding() {
73      return true;
74    }
75  
76    @Override
77    protected LongArrayList createList() {
78      return new LongArrayList();
79    }
80  
81    @Override
82    public void addPartitionMessages(int partitionId,
83      VertexIdMessages<LongWritable, M> messages) {
84      try {
85        VertexIdMessageIterator<LongWritable, M> iterator =
86            messages.getVertexIdMessageIterator();
87        long pointer = 0;
88        LongArrayList list;
89        while (iterator.hasNext()) {
90          iterator.next();
91          M msg = iterator.getCurrentMessage();
92          list = getList(iterator);
93          if (iterator.isNewMessage()) {
94            IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
95            pointer = indexAndDataOut.getIndex();
96            pointer <<= 32;
97            ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
98            pointer += dataOutput.getPos();
99            msg.write(dataOutput);
100         }
101         synchronized (list) { // TODO - any better way?
102           list.add(pointer);
103         }
104       }
105     } catch (IOException e) {
106       throw new RuntimeException("addPartitionMessages: IOException while" +
107           " adding messages for a partition: " + e);
108     }
109   }
110 
111   @Override
112   public Iterable<M> getVertexMessages(
113     LongWritable vertexId) {
114     LongArrayList list = getPartitionMap(vertexId).get(
115         vertexId.get());
116     if (list == null) {
117       return EmptyIterable.get();
118     } else {
119       return new PointerListMessagesIterable<>(messageValueFactory,
120         list, bytesBuffer);
121     }
122   }
123 
124   // FIXME -- complete these for check-pointing
125   @Override
126   public void writePartition(DataOutput out, int partitionId)
127     throws IOException {
128   }
129 
130   @Override
131   public void readFieldsForPartition(DataInput in, int partitionId)
132     throws IOException {
133   }
134 }