1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.graph;
1920import org.apache.giraph.edge.Edge;
21import org.apache.giraph.utils.UnsafeByteArrayInputStream;
22import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
23import org.apache.hadoop.io.Writable;
24import org.apache.hadoop.io.WritableComparable;
2526import java.io.DataInput;
27import java.io.IOException;
282930/**31 * Special version of vertex that holds the value in raw byte form to save32 * memory.33 *34 * @param <I> Vertex id35 * @param <V> Vertex data36 * @param <E> Edge data37 */38publicclass ByteValueVertex<I extends WritableComparable,
39 V extends Writable, E extends Writable>
40extends DefaultVertex<I, V, E> {
4142/**Vertex value stored as raw bytes */43private byte[] valueBytes;
44/** Value as an cached object that is only valid during the vertex update */45private V cachedValue = null;
4647 @Override
48public V getValue() {
49if (cachedValue != null) {
50return cachedValue; // Return always same instance51 }
52 DataInput dis = newUnsafeByteArrayInputStream(valueBytes);
53 cachedValue = getConf().createVertexValue();
54try {
55 cachedValue.readFields(dis);
56 } catch (IOException ioe) {
57thrownew RuntimeException("Could not deserialize vertex value", ioe);
58 }
59// Forget the serialized data, because we have cached the object60 valueBytes = null;
61return cachedValue;
62 }
6364/**65 * Serializes the value to bytes, stored in field valueBytes66 * @param value new vertex value67 */68privatevoid setSerializedValue(V value) {
69UnsafeByteArrayOutputStream bos = newUnsafeByteArrayOutputStream();
70try {
71 value.write(bos);
72 bos.close();
73 } catch (IOException ioe) {
74thrownew RuntimeException("Could not serialize vertex value", ioe);
75 }
76this.valueBytes = bos.toByteArray();
77 cachedValue = null;
78 }
7980 @Override
81publicvoid setValue(V value) {
82if (cachedValue != null) {
83 cachedValue = value;
84 } else {
85 setSerializedValue(value);
86 }
87 }
8889 @Override
90publicvoid initialize(I id, V value, Iterable<Edge<I, E>> edges) {
91// Set the parent's value to null, and instead use our own setter92super.initialize(id, null, edges);
93 setValue(value);
94 }
9596 @Override
97publicvoid initialize(I id, V value) {
98super.initialize(id, null);
99 setValue(value);
100 }
101102 @Override
103publicvoid unwrapMutableEdges() {
104// This method is called always after compute(vertex), so105// we use this to commit the vertex value.106if (cachedValue != null) {
107// This means the value has been requested from vertex108// and possible mutated -- so we need to update the byte array109 setSerializedValue(cachedValue);
110 cachedValue = null; // Uncache the value111 }
112super.unwrapMutableEdges();
113 }
114 }