This project has retired. For details please refer to its
Attic page.
ByteArrayEdges xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.edge;
20
21 import com.google.common.collect.UnmodifiableIterator;
22 import org.apache.giraph.utils.ExtendedDataInput;
23 import org.apache.giraph.utils.ExtendedDataOutput;
24 import org.apache.giraph.utils.Trimmable;
25 import org.apache.giraph.utils.WritableUtils;
26 import org.apache.hadoop.io.Writable;
27 import org.apache.hadoop.io.WritableComparable;
28
29 import java.io.DataInput;
30 import java.io.DataOutput;
31 import java.io.IOException;
32 import java.util.Arrays;
33 import java.util.Iterator;
34 import java.util.LinkedList;
35 import java.util.List;
36 import java.util.Collections;
37
38
39
40
41
42
43
44
45
46
47 public class ByteArrayEdges<I extends WritableComparable, E extends Writable>
48 extends ConfigurableOutEdges<I, E>
49 implements ReuseObjectsOutEdges<I, E>, Trimmable {
50
51 private byte[] serializedEdges;
52
53 private int serializedEdgesBytesUsed;
54
55 private int edgeCount;
56
57 @Override
58 public void initialize(Iterable<Edge<I, E>> edges) {
59 ExtendedDataOutput extendedOutputStream =
60 getConf().createExtendedDataOutput();
61 for (Edge<I, E> edge : edges) {
62 try {
63 WritableUtils.writeEdge(extendedOutputStream, edge);
64 } catch (IOException e) {
65 throw new IllegalStateException("initialize: Failed to serialize " +
66 edge);
67 }
68 ++edgeCount;
69 }
70 serializedEdges = extendedOutputStream.getByteArray();
71 serializedEdgesBytesUsed = extendedOutputStream.getPos();
72 }
73
74 @Override
75 public void initialize(int capacity) {
76
77
78 initialize();
79 }
80
81 @Override
82 public void initialize() {
83
84
85 }
86
87 @Override
88 public void add(Edge<I, E> edge) {
89 ExtendedDataOutput extendedDataOutput =
90 getConf().createExtendedDataOutput(
91 serializedEdges, serializedEdgesBytesUsed);
92 try {
93 WritableUtils.writeEdge(extendedDataOutput, edge);
94 } catch (IOException e) {
95 throw new IllegalStateException("add: Failed to write to the new " +
96 "byte array");
97 } catch (NegativeArraySizeException negativeArraySizeException) {
98 throw new IllegalStateException("add: Too many edges for a vertex, " +
99 "hence failed to write to byte array");
100 }
101 serializedEdges = extendedDataOutput.getByteArray();
102 serializedEdgesBytesUsed = extendedDataOutput.getPos();
103 ++edgeCount;
104 }
105
106 @Override
107 public void remove(I targetVertexId) {
108
109 ByteArrayEdgeIterator iterator = new ByteArrayEdgeIterator();
110 List<Integer> foundStartOffsets = new LinkedList<Integer>();
111 List<Integer> foundEndOffsets = new LinkedList<Integer>();
112 int lastStartOffset = 0;
113 while (iterator.hasNext()) {
114 Edge<I, E> edge = iterator.next();
115 if (edge.getTargetVertexId().equals(targetVertexId)) {
116 foundStartOffsets.add(lastStartOffset);
117 foundEndOffsets.add(iterator.extendedDataInput.getPos());
118 --edgeCount;
119 }
120 lastStartOffset = iterator.extendedDataInput.getPos();
121 }
122 foundStartOffsets.add(serializedEdgesBytesUsed);
123
124 Iterator<Integer> foundStartOffsetIter = foundStartOffsets.iterator();
125 Integer foundStartOffset = foundStartOffsetIter.next();
126 for (Integer foundEndOffset : foundEndOffsets) {
127 Integer nextFoundStartOffset = foundStartOffsetIter.next();
128 System.arraycopy(serializedEdges, foundEndOffset,
129 serializedEdges, foundStartOffset,
130 nextFoundStartOffset - foundEndOffset);
131 serializedEdgesBytesUsed -= foundEndOffset - foundStartOffset;
132 foundStartOffset = nextFoundStartOffset;
133 }
134 }
135
136 @Override
137 public int size() {
138 return edgeCount;
139 }
140
141 @Override
142 public void trim() {
143 if (serializedEdges != null &&
144 serializedEdges.length > serializedEdgesBytesUsed) {
145 serializedEdges =
146 Arrays.copyOf(serializedEdges, serializedEdgesBytesUsed);
147 }
148 }
149
150
151
152
153 private class ByteArrayEdgeIterator
154 extends UnmodifiableIterator<Edge<I, E>> {
155
156 private ExtendedDataInput extendedDataInput =
157 getConf().createExtendedDataInput(
158 serializedEdges, 0, serializedEdgesBytesUsed);
159
160 private ReusableEdge<I, E> representativeEdge =
161 getConf().createReusableEdge();
162
163 @Override
164 public boolean hasNext() {
165 return serializedEdges != null && !extendedDataInput.endOfInput();
166 }
167
168 @Override
169 public Edge<I, E> next() {
170 try {
171 WritableUtils.readEdge(extendedDataInput, representativeEdge);
172 } catch (IOException e) {
173 throw new IllegalStateException("next: Failed on pos " +
174 extendedDataInput.getPos() + " edge " + representativeEdge);
175 }
176 return representativeEdge;
177 }
178 }
179
180 @Override
181 public Iterator<Edge<I, E>> iterator() {
182 if (edgeCount == 0) {
183 return Collections.emptyListIterator();
184 } else {
185 return new ByteArrayEdgeIterator();
186 }
187 }
188
189 @Override
190 public void readFields(DataInput in) throws IOException {
191 serializedEdgesBytesUsed = in.readInt();
192 if (serializedEdgesBytesUsed > 0) {
193
194 if (serializedEdges == null ||
195 serializedEdgesBytesUsed > serializedEdges.length) {
196 serializedEdges = new byte[serializedEdgesBytesUsed];
197 }
198 in.readFully(serializedEdges, 0, serializedEdgesBytesUsed);
199 }
200 edgeCount = in.readInt();
201 }
202
203 @Override
204 public void write(DataOutput out) throws IOException {
205 out.writeInt(serializedEdgesBytesUsed);
206 if (serializedEdgesBytesUsed > 0) {
207 out.write(serializedEdges, 0, serializedEdgesBytesUsed);
208 }
209 out.writeInt(edgeCount);
210 }
211 }