View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.messages.primitives.long_id;
20  
21  import org.apache.giraph.bsp.CentralizedServiceWorker;
22  import org.apache.giraph.comm.messages.MessagesIterable;
23  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24  import org.apache.giraph.factories.MessageValueFactory;
25  import org.apache.giraph.utils.VertexIdMessageBytesIterator;
26  import org.apache.giraph.utils.VertexIdMessageIterator;
27  import org.apache.giraph.utils.VertexIdMessages;
28  import org.apache.giraph.utils.VerboseByteStructMessageWrite;
29  import org.apache.giraph.utils.EmptyIterable;
30  import org.apache.giraph.utils.io.DataInputOutput;
31  import org.apache.hadoop.io.LongWritable;
32  import org.apache.hadoop.io.Writable;
33  
34  import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
35  import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
36  import it.unimi.dsi.fastutil.objects.ObjectIterator;
37  
38  import java.io.DataInput;
39  import java.io.DataOutput;
40  import java.io.IOException;
41  
42  /**
43   * Special message store to be used when ids are LongWritable and no combiner
44   * is used.
45   * Uses fastutil primitive maps in order to decrease number of objects and
46   * get better performance.
47   *
48   * @param <M> Message type
49   */
50  public class LongByteArrayMessageStore<M extends Writable>
51    extends LongAbstractMessageStore<M, DataInputOutput> {
52  
53    /**
54     * Constructor
55     *
56     * @param messageValueFactory Factory for creating message values
57     * @param service             Service worker
58     * @param config              Hadoop configuration
59     */
60    public LongByteArrayMessageStore(
61        MessageValueFactory<M> messageValueFactory,
62        CentralizedServiceWorker<LongWritable, Writable, Writable> service,
63        ImmutableClassesGiraphConfiguration<LongWritable,
64            Writable, Writable> config) {
65      super(messageValueFactory, service, config);
66    }
67  
68    @Override
69    public boolean isPointerListEncoding() {
70      return false;
71    }
72  
73    /**
74     * Get the DataInputOutput for a vertex id, creating if necessary.
75     *
76     * @param partitionMap Partition map to look in
77     * @param vertexId Id of the vertex
78     * @return DataInputOutput for this vertex id (created if necessary)
79     */
80    private DataInputOutput getDataInputOutput(
81      Long2ObjectOpenHashMap<DataInputOutput> partitionMap, long vertexId) {
82      DataInputOutput dataInputOutput = partitionMap.get(vertexId);
83      if (dataInputOutput == null) {
84        dataInputOutput = config.createMessagesInputOutput();
85        partitionMap.put(vertexId, dataInputOutput);
86      }
87      return dataInputOutput;
88    }
89  
90    @Override
91    public void addPartitionMessages(int partitionId,
92      VertexIdMessages<LongWritable, M> messages) {
93      Long2ObjectOpenHashMap<DataInputOutput> partitionMap = map.get(partitionId);
94      synchronized (partitionMap) {
95        VertexIdMessageBytesIterator<LongWritable, M>
96            vertexIdMessageBytesIterator =
97            messages.getVertexIdMessageBytesIterator();
98        // Try to copy the message buffer over rather than
99        // doing a deserialization of a message just to know its size.  This
100       // should be more efficient for complex objects where serialization is
101       // expensive.  If this type of iterator is not available, fall back to
102       // deserializing/serializing the messages
103       if (vertexIdMessageBytesIterator != null) {
104         while (vertexIdMessageBytesIterator.hasNext()) {
105           vertexIdMessageBytesIterator.next();
106           DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
107               vertexIdMessageBytesIterator.getCurrentVertexId().get());
108           vertexIdMessageBytesIterator.writeCurrentMessageBytes(
109               dataInputOutput.getDataOutput());
110         }
111       } else {
112         try {
113           VertexIdMessageIterator<LongWritable, M>
114               iterator = messages.getVertexIdMessageIterator();
115           while (iterator.hasNext()) {
116             iterator.next();
117             DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
118                 iterator.getCurrentVertexId().get());
119             VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
120                 dataInputOutput.getDataOutput());
121           }
122         } catch (IOException e) {
123           throw new RuntimeException("addPartitionMessages: IOException while" +
124               " adding messages for a partition: " + e);
125         }
126       }
127     }
128   }
129 
130   @Override
131   public void finalizeStore() {
132   }
133 
134   @Override
135   public Iterable<M> getVertexMessages(
136     LongWritable vertexId) {
137     DataInputOutput dataInputOutput =
138         getPartitionMap(vertexId).get(vertexId.get());
139     if (dataInputOutput == null) {
140       return EmptyIterable.get();
141     } else {
142       return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
143     }
144   }
145 
146   @Override
147   public void writePartition(DataOutput out, int partitionId)
148     throws IOException {
149     Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
150         map.get(partitionId);
151     out.writeInt(partitionMap.size());
152     ObjectIterator<Long2ObjectMap.Entry<DataInputOutput>> iterator =
153         partitionMap.long2ObjectEntrySet().fastIterator();
154     while (iterator.hasNext()) {
155       Long2ObjectMap.Entry<DataInputOutput> entry = iterator.next();
156       out.writeLong(entry.getLongKey());
157       entry.getValue().write(out);
158     }
159   }
160 
161   @Override
162   public void readFieldsForPartition(DataInput in,
163     int partitionId) throws IOException {
164     int size = in.readInt();
165     Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
166         new Long2ObjectOpenHashMap<DataInputOutput>(size);
167     while (size-- > 0) {
168       long vertexId = in.readLong();
169       DataInputOutput dataInputOutput = config.createMessagesInputOutput();
170       dataInputOutput.readFields(in);
171       partitionMap.put(vertexId, dataInputOutput);
172     }
173     synchronized (map) {
174       map.put(partitionId, partitionMap);
175     }
176   }
177 }