View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.edge;
20  
21  import com.google.common.collect.Iterators;
22  import com.google.common.collect.UnmodifiableIterator;
23  import org.apache.giraph.utils.ExtendedDataInput;
24  import org.apache.giraph.utils.ExtendedDataOutput;
25  import org.apache.giraph.utils.Trimmable;
26  import org.apache.giraph.utils.WritableUtils;
27  import org.apache.hadoop.io.Writable;
28  import org.apache.hadoop.io.WritableComparable;
29  
30  import java.io.DataInput;
31  import java.io.DataOutput;
32  import java.io.IOException;
33  import java.util.Arrays;
34  import java.util.Iterator;
35  import java.util.LinkedList;
36  import java.util.List;
37  
38  /**
39   * {@link OutEdges} implementation backed by a byte array.
40   * Parallel edges are allowed.
41   * Note: this implementation is optimized for space usage,
42   * but edge removals are expensive.
43   *
44   * @param <I> Vertex id
45   * @param <E> Edge value
46   */
47  public class ByteArrayEdges<I extends WritableComparable, E extends Writable>
48      extends ConfigurableOutEdges<I, E>
49      implements ReuseObjectsOutEdges<I, E>, Trimmable {
50    /** Serialized edges. */
51    private byte[] serializedEdges;
52    /** Number of bytes used in serializedEdges. */
53    private int serializedEdgesBytesUsed;
54    /** Number of edges. */
55    private int edgeCount;
56  
57    @Override
58    public void initialize(Iterable<Edge<I, E>> edges) {
59      ExtendedDataOutput extendedOutputStream =
60          getConf().createExtendedDataOutput();
61      for (Edge<I, E> edge : edges) {
62        try {
63          WritableUtils.writeEdge(extendedOutputStream, edge);
64        } catch (IOException e) {
65          throw new IllegalStateException("initialize: Failed to serialize " +
66              edge);
67        }
68        ++edgeCount;
69      }
70      serializedEdges = extendedOutputStream.getByteArray();
71      serializedEdgesBytesUsed = extendedOutputStream.getPos();
72    }
73  
74    @Override
75    public void initialize(int capacity) {
76      // We have no way to know the size in bytes used by a certain
77      // number of edges.
78      initialize();
79    }
80  
81    @Override
82    public void initialize() {
83      // No-op: no need to initialize the byte-array if there are no edges,
84      // since add() and iterator() work fine with a null buffer.
85    }
86  
87    @Override
88    public void add(Edge<I, E> edge) {
89      ExtendedDataOutput extendedDataOutput =
90          getConf().createExtendedDataOutput(
91              serializedEdges, serializedEdgesBytesUsed);
92      try {
93        WritableUtils.writeEdge(extendedDataOutput, edge);
94      } catch (IOException e) {
95        throw new IllegalStateException("add: Failed to write to the new " +
96            "byte array");
97      }
98      serializedEdges = extendedDataOutput.getByteArray();
99      serializedEdgesBytesUsed = extendedDataOutput.getPos();
100     ++edgeCount;
101   }
102 
103   @Override
104   public void remove(I targetVertexId) {
105     // Note that this is very expensive (deserializes all edges).
106     ByteArrayEdgeIterator iterator = new ByteArrayEdgeIterator();
107     List<Integer> foundStartOffsets = new LinkedList<Integer>();
108     List<Integer> foundEndOffsets = new LinkedList<Integer>();
109     int lastStartOffset = 0;
110     while (iterator.hasNext()) {
111       Edge<I, E> edge = iterator.next();
112       if (edge.getTargetVertexId().equals(targetVertexId)) {
113         foundStartOffsets.add(lastStartOffset);
114         foundEndOffsets.add(iterator.extendedDataInput.getPos());
115         --edgeCount;
116       }
117       lastStartOffset = iterator.extendedDataInput.getPos();
118     }
119     foundStartOffsets.add(serializedEdgesBytesUsed);
120 
121     Iterator<Integer> foundStartOffsetIter = foundStartOffsets.iterator();
122     Integer foundStartOffset = foundStartOffsetIter.next();
123     for (Integer foundEndOffset : foundEndOffsets) {
124       Integer nextFoundStartOffset = foundStartOffsetIter.next();
125       System.arraycopy(serializedEdges, foundEndOffset,
126           serializedEdges, foundStartOffset,
127           nextFoundStartOffset - foundEndOffset);
128       serializedEdgesBytesUsed -= foundEndOffset - foundStartOffset;
129       foundStartOffset = nextFoundStartOffset;
130     }
131   }
132 
133   @Override
134   public int size() {
135     return edgeCount;
136   }
137 
138   @Override
139   public void trim() {
140     if (serializedEdges != null &&
141         serializedEdges.length > serializedEdgesBytesUsed) {
142       serializedEdges =
143           Arrays.copyOf(serializedEdges, serializedEdgesBytesUsed);
144     }
145   }
146 
147   /**
148    * Iterator that reuses the same Edge object.
149    */
150   private class ByteArrayEdgeIterator
151       extends UnmodifiableIterator<Edge<I, E>> {
152     /** Input for processing the bytes */
153     private ExtendedDataInput extendedDataInput =
154         getConf().createExtendedDataInput(
155             serializedEdges, 0, serializedEdgesBytesUsed);
156     /** Representative edge object. */
157     private ReusableEdge<I, E> representativeEdge =
158         getConf().createReusableEdge();
159 
160     @Override
161     public boolean hasNext() {
162       return serializedEdges != null && !extendedDataInput.endOfInput();
163     }
164 
165     @Override
166     public Edge<I, E> next() {
167       try {
168         WritableUtils.readEdge(extendedDataInput, representativeEdge);
169       } catch (IOException e) {
170         throw new IllegalStateException("next: Failed on pos " +
171             extendedDataInput.getPos() + " edge " + representativeEdge);
172       }
173       return representativeEdge;
174     }
175   }
176 
177   @Override
178   public Iterator<Edge<I, E>> iterator() {
179     if (edgeCount == 0) {
180       return Iterators.emptyIterator();
181     } else {
182       return new ByteArrayEdgeIterator();
183     }
184   }
185 
186   @Override
187   public void readFields(DataInput in) throws IOException {
188     serializedEdgesBytesUsed = in.readInt();
189     if (serializedEdgesBytesUsed > 0) {
190       // Only create a new buffer if the old one isn't big enough
191       if (serializedEdges == null ||
192           serializedEdgesBytesUsed > serializedEdges.length) {
193         serializedEdges = new byte[serializedEdgesBytesUsed];
194       }
195       in.readFully(serializedEdges, 0, serializedEdgesBytesUsed);
196     }
197     edgeCount = in.readInt();
198   }
199 
200   @Override
201   public void write(DataOutput out) throws IOException {
202     out.writeInt(serializedEdgesBytesUsed);
203     if (serializedEdgesBytesUsed > 0) {
204       out.write(serializedEdges, 0, serializedEdgesBytesUsed);
205     }
206     out.writeInt(edgeCount);
207   }
208 }