This project has retired. For details please refer to its Attic page.
ByteArrayEdges xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.edge;
20  
21  import com.google.common.collect.UnmodifiableIterator;
22  import org.apache.giraph.utils.ExtendedDataInput;
23  import org.apache.giraph.utils.ExtendedDataOutput;
24  import org.apache.giraph.utils.Trimmable;
25  import org.apache.giraph.utils.WritableUtils;
26  import org.apache.hadoop.io.Writable;
27  import org.apache.hadoop.io.WritableComparable;
28  
29  import java.io.DataInput;
30  import java.io.DataOutput;
31  import java.io.IOException;
32  import java.util.Arrays;
33  import java.util.Iterator;
34  import java.util.LinkedList;
35  import java.util.List;
36  import java.util.Collections;
37  
38  /**
39   * {@link OutEdges} implementation backed by a byte array.
40   * Parallel edges are allowed.
41   * Note: this implementation is optimized for space usage,
42   * but edge removals are expensive.
43   *
44   * @param <I> Vertex id
45   * @param <E> Edge value
46   */
47  public class ByteArrayEdges<I extends WritableComparable, E extends Writable>
48      extends ConfigurableOutEdges<I, E>
49      implements ReuseObjectsOutEdges<I, E>, Trimmable {
50    /** Serialized edges. */
51    private byte[] serializedEdges;
52    /** Number of bytes used in serializedEdges. */
53    private int serializedEdgesBytesUsed;
54    /** Number of edges. */
55    private int edgeCount;
56  
57    @Override
58    public void initialize(Iterable<Edge<I, E>> edges) {
59      ExtendedDataOutput extendedOutputStream =
60          getConf().createExtendedDataOutput();
61      for (Edge<I, E> edge : edges) {
62        try {
63          WritableUtils.writeEdge(extendedOutputStream, edge);
64        } catch (IOException e) {
65          throw new IllegalStateException("initialize: Failed to serialize " +
66              edge);
67        }
68        ++edgeCount;
69      }
70      serializedEdges = extendedOutputStream.getByteArray();
71      serializedEdgesBytesUsed = extendedOutputStream.getPos();
72    }
73  
74    @Override
75    public void initialize(int capacity) {
76      // We have no way to know the size in bytes used by a certain
77      // number of edges.
78      initialize();
79    }
80  
81    @Override
82    public void initialize() {
83      // No-op: no need to initialize the byte-array if there are no edges,
84      // since add() and iterator() work fine with a null buffer.
85    }
86  
87    @Override
88    public void add(Edge<I, E> edge) {
89      ExtendedDataOutput extendedDataOutput =
90          getConf().createExtendedDataOutput(
91              serializedEdges, serializedEdgesBytesUsed);
92      try {
93        WritableUtils.writeEdge(extendedDataOutput, edge);
94      } catch (IOException e) {
95        throw new IllegalStateException("add: Failed to write to the new " +
96            "byte array");
97      } catch (NegativeArraySizeException negativeArraySizeException) {
98        throw new IllegalStateException("add: Too many edges for a vertex, " +
99          "hence failed to write to byte array");
100     }
101     serializedEdges = extendedDataOutput.getByteArray();
102     serializedEdgesBytesUsed = extendedDataOutput.getPos();
103     ++edgeCount;
104   }
105 
106   @Override
107   public void remove(I targetVertexId) {
108     // Note that this is very expensive (deserializes all edges).
109     ByteArrayEdgeIterator iterator = new ByteArrayEdgeIterator();
110     List<Integer> foundStartOffsets = new LinkedList<Integer>();
111     List<Integer> foundEndOffsets = new LinkedList<Integer>();
112     int lastStartOffset = 0;
113     while (iterator.hasNext()) {
114       Edge<I, E> edge = iterator.next();
115       if (edge.getTargetVertexId().equals(targetVertexId)) {
116         foundStartOffsets.add(lastStartOffset);
117         foundEndOffsets.add(iterator.extendedDataInput.getPos());
118         --edgeCount;
119       }
120       lastStartOffset = iterator.extendedDataInput.getPos();
121     }
122     foundStartOffsets.add(serializedEdgesBytesUsed);
123 
124     Iterator<Integer> foundStartOffsetIter = foundStartOffsets.iterator();
125     Integer foundStartOffset = foundStartOffsetIter.next();
126     for (Integer foundEndOffset : foundEndOffsets) {
127       Integer nextFoundStartOffset = foundStartOffsetIter.next();
128       System.arraycopy(serializedEdges, foundEndOffset,
129           serializedEdges, foundStartOffset,
130           nextFoundStartOffset - foundEndOffset);
131       serializedEdgesBytesUsed -= foundEndOffset - foundStartOffset;
132       foundStartOffset = nextFoundStartOffset;
133     }
134   }
135 
136   @Override
137   public int size() {
138     return edgeCount;
139   }
140 
141   @Override
142   public void trim() {
143     if (serializedEdges != null &&
144         serializedEdges.length > serializedEdgesBytesUsed) {
145       serializedEdges =
146           Arrays.copyOf(serializedEdges, serializedEdgesBytesUsed);
147     }
148   }
149 
150   /**
151    * Iterator that reuses the same Edge object.
152    */
153   private class ByteArrayEdgeIterator
154       extends UnmodifiableIterator<Edge<I, E>> {
155     /** Input for processing the bytes */
156     private ExtendedDataInput extendedDataInput =
157         getConf().createExtendedDataInput(
158             serializedEdges, 0, serializedEdgesBytesUsed);
159     /** Representative edge object. */
160     private ReusableEdge<I, E> representativeEdge =
161         getConf().createReusableEdge();
162 
163     @Override
164     public boolean hasNext() {
165       return serializedEdges != null && !extendedDataInput.endOfInput();
166     }
167 
168     @Override
169     public Edge<I, E> next() {
170       try {
171         WritableUtils.readEdge(extendedDataInput, representativeEdge);
172       } catch (IOException e) {
173         throw new IllegalStateException("next: Failed on pos " +
174             extendedDataInput.getPos() + " edge " + representativeEdge);
175       }
176       return representativeEdge;
177     }
178   }
179 
180   @Override
181   public Iterator<Edge<I, E>> iterator() {
182     if (edgeCount == 0) {
183       return Collections.emptyListIterator();
184     } else {
185       return new ByteArrayEdgeIterator();
186     }
187   }
188 
189   @Override
190   public void readFields(DataInput in) throws IOException {
191     serializedEdgesBytesUsed = in.readInt();
192     if (serializedEdgesBytesUsed > 0) {
193       // Only create a new buffer if the old one isn't big enough
194       if (serializedEdges == null ||
195           serializedEdgesBytesUsed > serializedEdges.length) {
196         serializedEdges = new byte[serializedEdgesBytesUsed];
197       }
198       in.readFully(serializedEdges, 0, serializedEdgesBytesUsed);
199     }
200     edgeCount = in.readInt();
201   }
202 
203   @Override
204   public void write(DataOutput out) throws IOException {
205     out.writeInt(serializedEdgesBytesUsed);
206     if (serializedEdgesBytesUsed > 0) {
207       out.write(serializedEdges, 0, serializedEdgesBytesUsed);
208     }
209     out.writeInt(edgeCount);
210   }
211 }