This project has retired. For details please refer to its Attic page.
LongDiffNullArrayEdges xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.edge;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.IOException;
23  import java.util.Iterator;
24  
25  import javax.annotation.concurrent.NotThreadSafe;
26  
27  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
28  import org.apache.giraph.utils.EdgeIterables;
29  import org.apache.giraph.utils.Trimmable;
30  import org.apache.hadoop.io.LongWritable;
31  import org.apache.hadoop.io.NullWritable;
32  import org.apache.hadoop.io.Writable;
33  
34  /**
35   * Implementation of {@link org.apache.giraph.edge.OutEdges} with long ids
36   * and null edge values, backed by a dynamic primitive array.
37   * Parallel edges are allowed.
38   * Note: this implementation is optimized for space usage,
39   * but random access and edge removals are expensive.
40   * Users of this class should explicitly call {@link #trim()} function
41   * to compact in-memory representation after all updates are done.
42   * Compacting object is expensive so should only be done once after bulk update.
43   * Compaction can also be caused by serialization attempt or
44   * by calling {@link #iterator()}
45   */
46  @NotThreadSafe
47  public class LongDiffNullArrayEdges
48      extends ConfigurableOutEdges<LongWritable, NullWritable>
49      implements ReuseObjectsOutEdges<LongWritable, NullWritable>,
50      MutableOutEdges<LongWritable, NullWritable>, Trimmable {
51  
52    /**
53     * Compressed array of target vertex ids.
54     */
55    private LongDiffArray edges = new LongDiffArray();
56  
57    @Override
58    public void setConf(ImmutableClassesGiraphConfiguration
59                        <LongWritable, Writable, NullWritable> conf) {
60      super.setConf(conf);
61      edges.setUseUnsafeSerialization(conf.getUseUnsafeSerialization());
62    }
63  
64    @Override
65    public void initialize(
66      Iterable<Edge<LongWritable, NullWritable>> edgeIterator
67    ) {
68      edges.initialize();
69      EdgeIterables.initialize(this, edgeIterator);
70      edges.trim();
71    }
72  
73    @Override
74    public void initialize(int capacity) {
75      edges.initialize(capacity);
76    }
77  
78    @Override
79    public void initialize() {
80      edges.initialize();
81    }
82  
83    @Override
84    public void add(Edge<LongWritable, NullWritable> edge) {
85      edges.add(edge.getTargetVertexId().get());
86    }
87  
88  
89    @Override
90    public void remove(LongWritable targetVertexId) {
91      edges.remove(targetVertexId.get());
92    }
93  
94    @Override
95    public int size() {
96      return edges.size();
97    }
98  
99    @Override
100   public Iterator<Edge<LongWritable, NullWritable>> iterator() {
101     // Returns an iterator that reuses objects.
102     // The downcast is fine because all concrete Edge implementations are
103     // mutable, but we only expose the mutation functionality when appropriate.
104     return (Iterator) mutableIterator();
105   }
106 
107   @Override
108   public Iterator<MutableEdge<LongWritable, NullWritable>> mutableIterator() {
109     trim();
110     return new Iterator<MutableEdge<LongWritable, NullWritable>>() {
111       private final Iterator<LongWritable> reader = edges.iterator();
112 
113       /** Representative edge object. */
114       private final MutableEdge<LongWritable, NullWritable> representativeEdge =
115           EdgeFactory.createReusable(new LongWritable());
116 
117       @Override
118       public boolean hasNext() {
119         return reader.hasNext();
120       }
121 
122       @Override
123       public MutableEdge<LongWritable, NullWritable> next() {
124         representativeEdge.getTargetVertexId().set(reader.next().get());
125         return representativeEdge;
126       }
127 
128       @Override
129       public void remove() {
130         reader.remove();
131       }
132     };
133   }
134 
135   @Override
136   public void write(DataOutput out) throws IOException {
137     edges.write(out);
138   }
139 
140   @Override
141   public void readFields(DataInput in) throws IOException {
142     edges.readFields(in);
143   }
144 
145   /**
146    * This function takes all recent updates and stores them efficiently.
147    * It is safe to call this function multiple times.
148    */
149   @Override
150   public void trim() {
151     edges.trim();
152   }
153 }