View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.messages;
20  
21  import it.unimi.dsi.fastutil.longs.LongArrayList;
22  import org.apache.giraph.bsp.CentralizedServiceWorker;
23  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24  import org.apache.giraph.factories.MessageValueFactory;
25  import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer;
26  import org.apache.giraph.utils.ExtendedDataOutput;
27  import org.apache.giraph.utils.VertexIdMessageIterator;
28  import org.apache.giraph.utils.VertexIdMessages;
29  import org.apache.hadoop.io.Writable;
30  import org.apache.hadoop.io.WritableComparable;
31  
32  import java.io.DataInput;
33  import java.io.DataOutput;
34  import java.io.IOException;
35  import java.util.concurrent.ConcurrentMap;
36  
37  import static org.apache.giraph.utils.ExtendedByteArrayOutputBuffer.IndexAndDataOut;
38  
39  /**
40   * Implementation of {@link SimpleMessageStore} where multiple messages are
41   * stored as a list of long pointers to extended data output objects
42   * Used when there is no combiner provided.
43   *
44   * @param <I> vertexId type
45   * @param <M> message type
46   */
47  public class PointerListPerVertexStore<I extends WritableComparable,
48    M extends Writable> extends AbstractListPerVertexStore<I, M, LongArrayList> {
49  
50    /** Buffers of byte array outputs used to store messages - thread safe */
51    private final ExtendedByteArrayOutputBuffer bytesBuffer;
52  
53    /**
54     * Constructor
55     *
56     * @param messageValueFactory Message class held in the store
57     * @param service Service worker
58     * @param config Hadoop configuration
59     */
60    public PointerListPerVertexStore(
61        MessageValueFactory<M> messageValueFactory,
62        CentralizedServiceWorker<I, ?, ?> service,
63        ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
64      super(messageValueFactory, service, config);
65      bytesBuffer = new ExtendedByteArrayOutputBuffer(config);
66    }
67  
68    @Override
69    public boolean isPointerListEncoding() {
70      return true;
71    }
72  
73    @Override
74    protected LongArrayList createList() {
75      return new LongArrayList();
76    }
77  
78    @Override
79    public void addPartitionMessages(
80      int partitionId, VertexIdMessages<I, M> messages) {
81      try {
82        VertexIdMessageIterator<I, M> vertexIdMessageIterator =
83            messages.getVertexIdMessageIterator();
84        long pointer = 0;
85        LongArrayList list;
86        while (vertexIdMessageIterator.hasNext()) {
87          vertexIdMessageIterator.next();
88          M msg = vertexIdMessageIterator.getCurrentMessage();
89          list = getOrCreateList(vertexIdMessageIterator);
90          if (vertexIdMessageIterator.isNewMessage()) {
91            IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
92            pointer = indexAndDataOut.getIndex();
93            pointer <<= 32;
94            ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
95            pointer += dataOutput.getPos();
96            msg.write(dataOutput);
97          }
98          synchronized (list) {
99            list.add(pointer);
100         }
101       }
102     } catch (IOException e) {
103       throw new RuntimeException("addPartitionMessages: IOException while" +
104           " adding messages for a partition: " + e);
105     }
106   }
107 
108   /**
109    * Get messages as an iterable from message storage
110    *
111    * @param pointers list of pointers to messages
112    * @return Messages as an iterable
113    */
114   @Override
115   public Iterable<M> getMessagesAsIterable(LongArrayList pointers) {
116     return new PointerListMessagesIterable<>(messageValueFactory, pointers,
117       bytesBuffer);
118   }
119 
120   @Override
121   protected int getNumberOfMessagesIn(ConcurrentMap<I,
122     LongArrayList> partitionMap) {
123     int numberOfMessages = 0;
124     for (LongArrayList list : partitionMap.values()) {
125       numberOfMessages += list.size();
126     }
127     return numberOfMessages;
128   }
129 
130   // FIXME -- complete these for check-pointing
131   @Override
132   protected void writeMessages(LongArrayList messages, DataOutput out)
133     throws IOException {
134 
135   }
136 
137   @Override
138   protected LongArrayList readFieldsForMessages(DataInput in)
139     throws IOException {
140     return null;
141   }
142 }