1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.edge;
1920import it.unimi.dsi.fastutil.bytes.ByteArrays;
21import it.unimi.dsi.fastutil.longs.LongArrayList;
2223import java.io.DataInput;
24import java.io.DataOutput;
25import java.io.IOException;
26import java.util.Arrays;
27import java.util.BitSet;
28import java.util.Iterator;
2930import javax.annotation.concurrent.NotThreadSafe;
3132import org.apache.giraph.utils.ExtendedByteArrayDataInput;
33import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
34import org.apache.giraph.utils.ExtendedDataInput;
35import org.apache.giraph.utils.ExtendedDataOutput;
36import org.apache.giraph.utils.UnsafeByteArrayInputStream;
37import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
38import org.apache.giraph.utils.Varint;
39import org.apache.hadoop.io.LongWritable;
40import org.apache.hadoop.io.Writable;
4142import com.google.common.base.Preconditions;
4344/**45 * Compressed list array of long ids.46 * Note: this implementation is optimized for space usage,47 * but random access and edge removals are expensive.48 * Users of this class should explicitly call {@link #trim()} function49 * to compact in-memory representation after all updates are done.50 * Compacting object is expensive so should only be done once after bulk update.51 * Compaction can also be caused by serialization attempt or52 * by calling {@link #iterator()}53 */54 @NotThreadSafe
55publicclassLongDiffArrayimplements Writable {
5657/**58 * Array of target vertex ids.59 */60private byte[] compressedData;
61/**62 * Number of edges stored in compressed array.63 * There may be some extra edges in transientData or there may be some edges64 * removed. These will not count here. To get real number of elements stored65 * in this object @see {@link #size()}66 */67privateint size;
6869/**70 * Last updates are stored here. We clear them out after object is compacted.71 */72privateTransientChanges transientData;
7374/**75 * Use unsafe serialization?76 */77privateboolean useUnsafeSerialization = true;
7879/**80 * Set whether to use unsafe serailization81 * @param useUnsafeSerialization use unsafe serialization82 */83publicvoid setUseUnsafeSerialization(boolean useUnsafeSerialization) {
84this.useUnsafeSerialization = useUnsafeSerialization;
85 }
8687/**88 * Initialize with a given capacity89 * @param capacity capacity90 */91publicvoid initialize(int capacity) {
92 reset();
93if (capacity > 0) {
94 transientData = newTransientChanges(capacity);
95 }
96 }
9798/**99 * Initialize array100 */101publicvoid initialize() {
102 reset();
103 }
104105/**106 * Add a value107 * @param id id to add108 */109publicvoid add(long id) {
110 checkTransientData();
111 transientData.add(id);
112 }
113114115/**116 * Remove a given value117 * @param id id to remove118 */119publicvoid remove(long id) {
120 checkTransientData();
121122if (size > 0) {
123LongsDiffReader reader = newLongsDiffReader(
124 compressedData,
125 useUnsafeSerialization
126 );
127for (int i = 0; i < size; i++) {
128long cur = reader.readNext();
129if (cur == id) {
130 transientData.markRemoved(i);
131 } elseif (cur > id) {
132break;
133 }
134 }
135 }
136 transientData.removeAdded(id);
137 }
138139/**140 * The number of stored ids141 * @return the number of stored ids142 */143publicint size() {
144int result = size;
145if (transientData != null) {
146 result += transientData.size();
147 }
148return result;
149 }
150151/**152 * Returns an iterator that reuses objects.153 * @return Iterator154 */155public Iterator<LongWritable> iterator() {
156 trim();
157returnnew Iterator<LongWritable>() {
158/** Current position in the array. */159privateint position;
160privatefinalLongsDiffReader reader =
161newLongsDiffReader(compressedData, useUnsafeSerialization);
162163/** Representative edge object. */164privatefinal LongWritable reusableLong = new LongWritable();
165166 @Override
167publicboolean hasNext() {
168return position < size;
169 }
170171 @Override
172public LongWritable next() {
173 position++;
174 reusableLong.set(reader.readNext());
175return reusableLong;
176 }
177178 @Override
179publicvoid remove() {
180 removeAt(position - 1);
181 }
182 };
183 }
184185 @Override
186publicvoid write(DataOutput out) throws IOException {
187 trim();
188 Varint.writeUnsignedVarInt(compressedData.length, out);
189 Varint.writeUnsignedVarInt(size, out);
190 out.write(compressedData);
191 }
192193 @Override
194publicvoid readFields(DataInput in) throws IOException {
195 reset();
196 compressedData = new byte[Varint.readUnsignedVarInt(in)];
197// We can actually calculate size after data array is read,198// the trade-off is memory vs speed199 size = Varint.readUnsignedVarInt(in);
200 in.readFully(compressedData);
201 }
202203/**204 * This function takes all recent updates and stores them efficiently.205 * It is safe to call this function multiple times.206 */207publicvoid trim() {
208if (transientData == null) {
209// We don't have any updates to this object. Return quickly.210return;
211 }
212213// Beware this array is longer than the number of elements we interested in214long[] transientValues = transientData.sortedValues();
215int pCompressed = 0;
216int pTransient = 0;
217218LongsDiffReader reader = newLongsDiffReader(
219 compressedData,
220 useUnsafeSerialization
221 );
222LongsDiffWriter writer = newLongsDiffWriter(useUnsafeSerialization);
223224long curValue = size > 0 ? reader.readNext() : Long.MAX_VALUE;
225226// Here we merge freshly added elements and old elements, we also want227// to prune removed elements. Both arrays are sorted so in order to merge228// them, we move to pointers and store result in the new array229while (pTransient < transientData.numberOfAddedElements() ||
230 pCompressed < size) {
231if (pTransient < transientData.numberOfAddedElements() &&
232 curValue >= transientValues[pTransient]) {
233 writer.writeNext(transientValues[pTransient]);
234 pTransient++;
235 } else {
236if (!transientData.isRemoved(pCompressed)) {
237 writer.writeNext(curValue);
238 }
239 pCompressed++;
240if (pCompressed < size) {
241 curValue = reader.readNext();
242 } else {
243 curValue = Long.MAX_VALUE;
244 }
245 }
246 }
247248 compressedData = writer.toByteArray();
249 size += transientData.size();
250 transientData = null;
251 }
252253254/**255 * Remove edge at position i.256 *257 * @param i Position of edge to be removed258 */259privatevoid removeAt(int i) {
260 checkTransientData();
261if (i < size) {
262 transientData.markRemoved(i);
263 } else {
264 transientData.removeAddedAt(i - size);
265 }
266 }
267268/**269 * Check if transient data needs to be created.270 */271privatevoid checkTransientData() {
272if (transientData == null) {
273 transientData = newTransientChanges();
274 }
275 }
276277/**278 * Reset object to completely empty state.279 */280privatevoid reset() {
281 compressedData = ByteArrays.EMPTY_ARRAY;
282 size = 0;
283 transientData = null;
284 }
285286/**287 * Reading array of longs diff encoded from byte array.288 */289privatestaticclassLongsDiffReader {
290/** Input stream */291privatefinalExtendedDataInput input;
292/** last read value */293privatelong current;
294/** True if we haven't read any numbers yet */295privateboolean first = true;
296297/**298 * Construct LongsDiffReader299 *300 * @param compressedData Input byte array301 * @param useUnsafeReader use unsafe reader302 */303publicLongsDiffReader(byte[] compressedData, boolean useUnsafeReader) {
304if (useUnsafeReader) {
305 input = newUnsafeByteArrayInputStream(compressedData);
306 } else {
307 input = newExtendedByteArrayDataInput(compressedData);
308 }
309 }
310311/**312 * Read next value from reader313 * @return next value314 */315long readNext() {
316try {
317if (first) {
318 current = input.readLong();
319 first = false;
320 } else {
321 current += Varint.readUnsignedVarLong(input);
322 }
323return current;
324 } catch (IOException e) {
325thrownew IllegalStateException(e);
326 }
327 }
328 }
329330/**331 * Writing array of longs diff encoded into the byte array.332 */333privatestaticclassLongsDiffWriter {
334/** Wrapping resultStream into DataOutputStream */335privatefinalExtendedDataOutput out;
336/** last value written */337privatelong lastWritten;
338/** True if we haven't written any numbers yet */339privateboolean first = true;
340341/**342 * Construct LongsDiffWriter343 * @param useUnsafeWriter use unsafe writer344 */345publicLongsDiffWriter(boolean useUnsafeWriter) {
346if (useUnsafeWriter) {
347 out = newUnsafeByteArrayOutputStream();
348 } else {
349 out = newExtendedByteArrayDataOutput();
350 }
351 }
352353/**354 * Write next value to writer355 * @param value Value to be written356 */357void writeNext(long value) {
358try {
359if (first) {
360 out.writeLong(value);
361 first = false;
362 } else {
363 Preconditions.checkState(value >= lastWritten,
364"Values need to be in order");
365 Preconditions.checkState((value - lastWritten) >= 0,
366"In order to use this class, difference of consecutive IDs " +
367"cannot overflow longs");
368 Varint.writeUnsignedVarLong(value - lastWritten, out);
369 }
370 lastWritten = value;
371 } catch (IOException e) {
372thrownew IllegalStateException(e);
373 }
374 }
375376/**377 * Get resulting byte array378 * @return resulting byte array379 */380 byte[] toByteArray() {
381return out.toByteArray();
382 }
383 }
384385/**386 * Temporary storage for all updates.387 * We don't want to update compressed array frequently so we only update it388 * on request at the same time we allow temporary updates to persist in this389 * class.390 */391privatestaticclassTransientChanges {
392/** Neighbors that were added since last flush */393privatefinal LongArrayList neighborsAdded;
394/** Removed indices in original array */395privatefinal BitSet removed = new BitSet();
396/** Number of values removed */397privateint removedCount;
398399/**400 * Construct transient changes with given capacity401 * @param capacity capacity402 */403privateTransientChanges(int capacity) {
404 neighborsAdded = new LongArrayList(capacity);
405 }
406407/**408 * Construct transient changes409 */410privateTransientChanges() {
411 neighborsAdded = new LongArrayList();
412 }
413414/**415 * Add new value416 * @param value value to add417 */418privatevoid add(long value) {
419 neighborsAdded.add(value);
420 }
421422/**423 * Mark given index to remove424 * @param index Index to remove425 */426privatevoid markRemoved(int index) {
427if (!removed.get(index)) {
428 removedCount++;
429 removed.set(index);
430 }
431 }
432433/**434 * Remove value from neighborsAdded435 * @param index Position to remove from436 */437privatevoid removeAddedAt(int index) {
438// The order of the edges is irrelevant, so we can simply replace439// the deleted edge with the rightmost element, thus achieving constant440// time.441if (index == neighborsAdded.size() - 1) {
442 neighborsAdded.popLong();
443 } else {
444 neighborsAdded.set(index, neighborsAdded.popLong());
445 }
446 }
447448/**449 * Number of added elements450 * @return number of added elements451 */452privateint numberOfAddedElements() {
453return neighborsAdded.size();
454 }
455456/**457 * Remove added value458 * @param target value to remove459 */460privatevoid removeAdded(long target) {
461 neighborsAdded.rem(target);
462 }
463464/**465 * Additional size in transient changes466 * @return additional size467 */468privateint size() {
469return neighborsAdded.size() - removedCount;
470 }
471472/**473 * Sorted added values474 * @return sorted added values475 */476privatelong[] sortedValues() {
477long[] ret = neighborsAdded.elements();
478 Arrays.sort(ret, 0, neighborsAdded.size());
479return ret;
480 }
481482/**483 * Check if index was removed484 * @param i Index to check485 * @return Whether it was removed486 */487privateboolean isRemoved(int i) {
488return removed.get(i);
489 }
490 }
491 }