This project has retired. For details please refer to its
        
        Attic page.
      
 
ByteArrayVertexIdMessages xref
1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.giraph.utils;
20  
21  import java.io.DataInput;
22  import java.io.DataOutput;
23  import java.io.IOException;
24  
25  import org.apache.giraph.factories.MessageValueFactory;
26  import org.apache.hadoop.io.Writable;
27  import org.apache.hadoop.io.WritableComparable;
28  
29  
30  
31  
32  
33  
34  
35  @SuppressWarnings("unchecked")
36  public class ByteArrayVertexIdMessages<I extends WritableComparable,
37    M extends Writable> extends ByteArrayVertexIdData<I, M>
38    implements VertexIdMessages<I, M> {
39    
40    private final MessageValueFactory<M> messageValueFactory;
41    
42    private boolean useMessageSizeEncoding = false;
43  
44    
45  
46  
47  
48  
49    public ByteArrayVertexIdMessages(
50        MessageValueFactory<M> messageValueFactory) {
51      this.messageValueFactory = messageValueFactory;
52    }
53  
54    
55  
56  
57  
58  
59    private void setUseMessageSizeEncoding() {
60      if (!getConf().useOutgoingMessageCombiner()) {
61        useMessageSizeEncoding = getConf().useMessageSizeEncoding();
62      } else {
63        useMessageSizeEncoding = false;
64      }
65    }
66  
67    @Override
68    public M createData() {
69      return messageValueFactory.newInstance();
70    }
71  
72    @Override
73    public void writeData(ExtendedDataOutput out, M message) throws IOException {
74      message.write(out);
75    }
76  
77    @Override
78    public void readData(ExtendedDataInput in, M message) throws IOException {
79      message.readFields(in);
80    }
81  
82    @Override
83    public void initialize() {
84      super.initialize();
85      setUseMessageSizeEncoding();
86    }
87  
88    @Override
89    public void initialize(int expectedSize) {
90      super.initialize(expectedSize);
91      setUseMessageSizeEncoding();
92    }
93  
94    @Override
95    public ByteStructVertexIdMessageIterator<I, M> getVertexIdMessageIterator() {
96      return new ByteStructVertexIdMessageIterator<>(this);
97    }
98  
99    @Override
100   public void add(I vertexId, M message) {
101     if (!useMessageSizeEncoding) {
102       super.add(vertexId, message);
103     } else {
104       try {
105         vertexId.write(extendedDataOutput);
106         writeMessageWithSize(message);
107       } catch (IOException e) {
108         throw new IllegalStateException("add: IOException occurred");
109       }
110     }
111   }
112 
113   @Override
114   public void add(byte[] serializedId, int idPos, M message) {
115     if (!useMessageSizeEncoding) {
116       super.add(serializedId, idPos, message);
117     } else {
118       try {
119         extendedDataOutput.write(serializedId, 0, idPos);
120         writeMessageWithSize(message);
121       } catch (IOException e) {
122         throw new IllegalStateException("add: IOException occurred");
123       }
124     }
125   }
126 
127   
128 
129 
130 
131 
132   private void writeMessageWithSize(M message) throws IOException {
133     int pos = extendedDataOutput.getPos();
134     extendedDataOutput.skipBytes(4);
135     writeData(extendedDataOutput, message);
136     extendedDataOutput.writeInt(
137         pos, extendedDataOutput.getPos() - pos - 4);
138   }
139 
140   @Override
141   public ByteStructVertexIdMessageBytesIterator<I, M>
142   getVertexIdMessageBytesIterator() {
143     if (!useMessageSizeEncoding) {
144       return null;
145     }
146     return new ByteStructVertexIdMessageBytesIterator<I, M>(this) {
147       @Override
148       public void writeCurrentMessageBytes(DataOutput dataOutput) {
149         try {
150           dataOutput.write(extendedDataOutput.getByteArray(),
151             messageOffset, messageBytes);
152         } catch (NegativeArraySizeException e) {
153           VerboseByteStructMessageWrite.handleNegativeArraySize(vertexId);
154         } catch (IOException e) {
155           throw new IllegalStateException("writeCurrentMessageBytes: Got " +
156               "IOException", e);
157         }
158       }
159     };
160   }
161 
162   @Override
163   public void write(DataOutput dataOutput) throws IOException {
164     dataOutput.writeBoolean(useMessageSizeEncoding);
165     super.write(dataOutput);
166   }
167 
168   @Override
169   public void readFields(DataInput dataInput) throws IOException {
170     useMessageSizeEncoding = dataInput.readBoolean();
171     super.readFields(dataInput);
172   }
173 }