This project has retired. For details please refer to its Attic page.
ByteValueVertex xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.graph;
19  
20  import org.apache.giraph.edge.Edge;
21  import org.apache.giraph.utils.UnsafeByteArrayInputStream;
22  import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
23  import org.apache.hadoop.io.Writable;
24  import org.apache.hadoop.io.WritableComparable;
25  
26  import java.io.DataInput;
27  import java.io.IOException;
28  
29  
30  /**
31   * Special version of vertex that holds the value in raw byte form to save
32   * memory.
33   *
34   * @param <I> Vertex id
35   * @param <V> Vertex data
36   * @param <E> Edge data
37   */
38  public class ByteValueVertex<I extends WritableComparable,
39          V extends Writable, E extends Writable>
40          extends DefaultVertex<I, V, E> {
41  
42    /** Vertex value stored as raw bytes */
43    private byte[] valueBytes;
44    /** Value as an cached object that is only valid during the vertex update */
45    private V cachedValue = null;
46  
47    @Override
48    public V getValue() {
49      if (cachedValue != null) {
50        return cachedValue; // Return always same instance
51      }
52      DataInput dis = new UnsafeByteArrayInputStream(valueBytes);
53      cachedValue = getConf().createVertexValue();
54      try {
55        cachedValue.readFields(dis);
56      } catch (IOException ioe) {
57        throw new RuntimeException("Could not deserialize vertex value", ioe);
58      }
59      // Forget the serialized data, because we have cached the object
60      valueBytes = null;
61      return cachedValue;
62    }
63  
64    /**
65     * Serializes the value to bytes, stored in field valueBytes
66     * @param value new vertex value
67     */
68    private void setSerializedValue(V value) {
69      UnsafeByteArrayOutputStream bos = new UnsafeByteArrayOutputStream();
70      try {
71        value.write(bos);
72        bos.close();
73      } catch (IOException ioe) {
74        throw new RuntimeException("Could not serialize vertex value", ioe);
75      }
76      this.valueBytes = bos.toByteArray();
77      cachedValue = null;
78    }
79  
80    @Override
81    public void setValue(V value) {
82      if (cachedValue != null) {
83        cachedValue = value;
84      } else {
85        setSerializedValue(value);
86      }
87    }
88  
89    @Override
90    public void initialize(I id, V value, Iterable<Edge<I, E>> edges) {
91      // Set the parent's value to null, and instead use our own setter
92      super.initialize(id, null, edges);
93      setValue(value);
94    }
95  
96    @Override
97    public void initialize(I id, V value) {
98      super.initialize(id, null);
99      setValue(value);
100   }
101 
102   @Override
103   public void unwrapMutableEdges() {
104     // This method is called always after compute(vertex), so
105     // we use this to commit the vertex value.
106     if (cachedValue != null) {
107       // This means the value has been requested from vertex
108       // and possible mutated -- so we need to update the byte array
109       setSerializedValue(cachedValue);
110       cachedValue = null;  // Uncache the value
111     }
112     super.unwrapMutableEdges();
113   }
114 }