This project has retired. For details please refer to its Attic page.
ByteArrayVertexIdMessages xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.utils;
20  
21  import java.io.DataInput;
22  import java.io.DataOutput;
23  import java.io.IOException;
24  
25  import org.apache.giraph.factories.MessageValueFactory;
26  import org.apache.hadoop.io.Writable;
27  import org.apache.hadoop.io.WritableComparable;
28  
29  /**
30   * Stores vertex id and message pairs in a single byte array.
31   *
32   * @param <I> Vertex id
33   * @param <M> Message data
34   */
35  @SuppressWarnings("unchecked")
36  public class ByteArrayVertexIdMessages<I extends WritableComparable,
37    M extends Writable> extends ByteArrayVertexIdData<I, M>
38    implements VertexIdMessages<I, M> {
39    /** Message value class */
40    private final MessageValueFactory<M> messageValueFactory;
41    /** Add the message size to the stream? (Depends on the message store) */
42    private boolean useMessageSizeEncoding = false;
43  
44    /**
45     * Constructor
46     *
47     * @param messageValueFactory Class for messages
48     */
49    public ByteArrayVertexIdMessages(
50        MessageValueFactory<M> messageValueFactory) {
51      this.messageValueFactory = messageValueFactory;
52    }
53  
54    /**
55     * Set whether message sizes should be encoded.  This should only be a
56     * possibility when not combining.  When combining, all messages need to be
57     * de-serialized right away, so this won't help.
58     */
59    private void setUseMessageSizeEncoding() {
60      if (!getConf().useOutgoingMessageCombiner()) {
61        useMessageSizeEncoding = getConf().useMessageSizeEncoding();
62      } else {
63        useMessageSizeEncoding = false;
64      }
65    }
66  
67    @Override
68    public M createData() {
69      return messageValueFactory.newInstance();
70    }
71  
72    @Override
73    public void writeData(ExtendedDataOutput out, M message) throws IOException {
74      message.write(out);
75    }
76  
77    @Override
78    public void readData(ExtendedDataInput in, M message) throws IOException {
79      message.readFields(in);
80    }
81  
82    @Override
83    public void initialize() {
84      super.initialize();
85      setUseMessageSizeEncoding();
86    }
87  
88    @Override
89    public void initialize(int expectedSize) {
90      super.initialize(expectedSize);
91      setUseMessageSizeEncoding();
92    }
93  
94    @Override
95    public ByteStructVertexIdMessageIterator<I, M> getVertexIdMessageIterator() {
96      return new ByteStructVertexIdMessageIterator<>(this);
97    }
98  
99    @Override
100   public void add(I vertexId, M message) {
101     if (!useMessageSizeEncoding) {
102       super.add(vertexId, message);
103     } else {
104       try {
105         vertexId.write(extendedDataOutput);
106         writeMessageWithSize(message);
107       } catch (IOException e) {
108         throw new IllegalStateException("add: IOException occurred");
109       }
110     }
111   }
112 
113   @Override
114   public void add(byte[] serializedId, int idPos, M message) {
115     if (!useMessageSizeEncoding) {
116       super.add(serializedId, idPos, message);
117     } else {
118       try {
119         extendedDataOutput.write(serializedId, 0, idPos);
120         writeMessageWithSize(message);
121       } catch (IOException e) {
122         throw new IllegalStateException("add: IOException occurred");
123       }
124     }
125   }
126 
127   /**
128    * Write a size of the message and message
129    *
130    * @param message Message to write
131    */
132   private void writeMessageWithSize(M message) throws IOException {
133     int pos = extendedDataOutput.getPos();
134     extendedDataOutput.skipBytes(4);
135     writeData(extendedDataOutput, message);
136     extendedDataOutput.writeInt(
137         pos, extendedDataOutput.getPos() - pos - 4);
138   }
139 
140   @Override
141   public ByteStructVertexIdMessageBytesIterator<I, M>
142   getVertexIdMessageBytesIterator() {
143     if (!useMessageSizeEncoding) {
144       return null;
145     }
146     return new ByteStructVertexIdMessageBytesIterator<I, M>(this) {
147       @Override
148       public void writeCurrentMessageBytes(DataOutput dataOutput) {
149         try {
150           dataOutput.write(extendedDataOutput.getByteArray(),
151             messageOffset, messageBytes);
152         } catch (NegativeArraySizeException e) {
153           VerboseByteStructMessageWrite.handleNegativeArraySize(vertexId);
154         } catch (IOException e) {
155           throw new IllegalStateException("writeCurrentMessageBytes: Got " +
156               "IOException", e);
157         }
158       }
159     };
160   }
161 
162   @Override
163   public void write(DataOutput dataOutput) throws IOException {
164     dataOutput.writeBoolean(useMessageSizeEncoding);
165     super.write(dataOutput);
166   }
167 
168   @Override
169   public void readFields(DataInput dataInput) throws IOException {
170     useMessageSizeEncoding = dataInput.readBoolean();
171     super.readFields(dataInput);
172   }
173 }