1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.aggregators;
20  
21  import java.io.IOException;
22  
23  import org.apache.giraph.comm.GlobalCommType;
24  import org.apache.giraph.utils.WritableUtils;
25  import org.apache.hadoop.io.Writable;
26  
27  /**
28   * Implementation of {@link CountingOutputStream} which allows writing of
29   * reduced values in the form of pair (name, type, value)
30   *
31   * There are two modes:
32   * - when class of the value is written into the stream.
33   * - when it isn't, and reader needs to know Class of the value in order
34   *   to read it.
35   */
36  public class GlobalCommValueOutputStream extends CountingOutputStream {
37    /** whether to write Class object for values into the stream */
38    private final boolean writeClass;
39  
40    /**
41     * Constructor
42     *
43     * @param writeClass boolean whether to write Class object for values
44     */
45    public GlobalCommValueOutputStream(boolean writeClass) {
46      this.writeClass = writeClass;
47    }
48  
49    /**
50     * Write global communication object to the stream
51     * and increment internal counter
52     *
53     * @param name Name
54     * @param type Global communication type
55     * @param value Object value
56     * @return Number of bytes occupied by the stream
57     * @throws IOException
58     */
59    public int addValue(String name, GlobalCommType type,
60        Writable value) throws IOException {
61      incrementCounter();
62      dataOutput.writeUTF(name);
63      dataOutput.writeByte(type.ordinal());
64      if (writeClass) {
65        WritableUtils.writeWritableObject(value, dataOutput);
66      } else {
67        value.write(dataOutput);
68      }
69      return getSize();
70    }
71  }