1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.edge;
1920import java.io.DataInput;
21import java.io.DataOutput;
22import java.io.IOException;
23import java.util.Iterator;
2425import javax.annotation.concurrent.NotThreadSafe;
2627import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
28import org.apache.giraph.utils.EdgeIterables;
29import org.apache.giraph.utils.Trimmable;
30import org.apache.hadoop.io.LongWritable;
31import org.apache.hadoop.io.NullWritable;
32import org.apache.hadoop.io.Writable;
3334/**35 * Implementation of {@link org.apache.giraph.edge.OutEdges} with long ids36 * and null edge values, backed by a dynamic primitive array.37 * Parallel edges are allowed.38 * Note: this implementation is optimized for space usage,39 * but random access and edge removals are expensive.40 * Users of this class should explicitly call {@link #trim()} function41 * to compact in-memory representation after all updates are done.42 * Compacting object is expensive so should only be done once after bulk update.43 * Compaction can also be caused by serialization attempt or44 * by calling {@link #iterator()}45 */46 @NotThreadSafe
47publicclassLongDiffNullArrayEdges48extends ConfigurableOutEdges<LongWritable, NullWritable>
49implements ReuseObjectsOutEdges<LongWritable, NullWritable>,
50 MutableOutEdges<LongWritable, NullWritable>, Trimmable {
5152/**53 * Compressed array of target vertex ids.54 */55privateLongDiffArray edges = newLongDiffArray();
5657 @Override
58publicvoid setConf(ImmutableClassesGiraphConfiguration59 <LongWritable, Writable, NullWritable> conf) {
60super.setConf(conf);
61 edges.setUseUnsafeSerialization(conf.getUseUnsafeSerialization());
62 }
6364 @Override
65publicvoid initialize(
66 Iterable<Edge<LongWritable, NullWritable>> edgeIterator
67 ) {
68 edges.initialize();
69 EdgeIterables.initialize(this, edgeIterator);
70 edges.trim();
71 }
7273 @Override
74publicvoid initialize(int capacity) {
75 edges.initialize(capacity);
76 }
7778 @Override
79publicvoid initialize() {
80 edges.initialize();
81 }
8283 @Override
84publicvoid add(Edge<LongWritable, NullWritable> edge) {
85 edges.add(edge.getTargetVertexId().get());
86 }
878889 @Override
90publicvoid remove(LongWritable targetVertexId) {
91 edges.remove(targetVertexId.get());
92 }
9394 @Override
95publicint size() {
96return edges.size();
97 }
9899 @Override
100public Iterator<Edge<LongWritable, NullWritable>> iterator() {
101// Returns an iterator that reuses objects.102// The downcast is fine because all concrete Edge implementations are103// mutable, but we only expose the mutation functionality when appropriate.104return (Iterator) mutableIterator();
105 }
106107 @Override
108public Iterator<MutableEdge<LongWritable, NullWritable>> mutableIterator() {
109 trim();
110returnnew Iterator<MutableEdge<LongWritable, NullWritable>>() {
111privatefinal Iterator<LongWritable> reader = edges.iterator();
112113/** Representative edge object. */114privatefinal MutableEdge<LongWritable, NullWritable> representativeEdge =
115 EdgeFactory.createReusable(new LongWritable());
116117 @Override
118publicboolean hasNext() {
119return reader.hasNext();
120 }
121122 @Override
123public MutableEdge<LongWritable, NullWritable> next() {
124 representativeEdge.getTargetVertexId().set(reader.next().get());
125return representativeEdge;
126 }
127128 @Override
129publicvoid remove() {
130 reader.remove();
131 }
132 };
133 }
134135 @Override
136publicvoid write(DataOutput out) throws IOException {
137 edges.write(out);
138 }
139140 @Override
141publicvoid readFields(DataInput in) throws IOException {
142 edges.readFields(in);
143 }
144145/**146 * This function takes all recent updates and stores them efficiently.147 * It is safe to call this function multiple times.148 */149 @Override
150publicvoid trim() {
151 edges.trim();
152 }
153 }