This project has retired. For details please refer to its Attic page.
TextAggregatorWriter xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.aggregators;
20  
21  import com.google.common.base.Charsets;
22  import java.io.IOException;
23  import java.util.Map.Entry;
24  import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
25  import org.apache.hadoop.fs.FSDataOutputStream;
26  import org.apache.hadoop.fs.FileSystem;
27  import org.apache.hadoop.fs.Path;
28  import org.apache.hadoop.io.Writable;
29  import org.apache.hadoop.mapreduce.Mapper.Context;
30  
31  /**
32   * Default implementation of {@link AggregatorWriter}. Each line consists of
33   * text and contains the aggregator name, the aggregator value and the
34   * aggregator class.
35   */
36  public class TextAggregatorWriter
37      extends DefaultImmutableClassesGiraphConfigurable
38      implements AggregatorWriter {
39    /** The filename of the outputfile */
40    public static final String FILENAME =
41        "giraph.textAggregatorWriter.filename";
42    /** Signal for "never write" frequency */
43    public static final int NEVER = 0;
44    /** Signal for "write only the final values" frequency */
45    public static final int AT_THE_END = -1;
46    /** Signal for "write values in every superstep" frequency */
47    public static final int ALWAYS = 1;
48    /** The frequency of writing:
49     *  - NEVER: never write, files aren't created at all
50     *  - AT_THE_END: aggregators are written only when the computation is over
51     *  - int: i.e. 1 is every superstep, 2 every two supersteps and so on
52     */
53    public static final String FREQUENCY =
54        "giraph.textAggregatorWriter.frequency";
55    /** Default filename for dumping aggregator values */
56    private static final String DEFAULT_FILENAME = "aggregatorValues";
57    /** Handle to the outputfile */
58    protected FSDataOutputStream output;
59    /** Write every "frequency" supersteps */
60    private int frequency;
61  
62    @Override
63    @SuppressWarnings("rawtypes")
64    public void initialize(Context context, long attempt) throws IOException {
65      frequency = getConf().getInt(FREQUENCY, NEVER);
66      String filename  = getConf().get(FILENAME, DEFAULT_FILENAME);
67      if (frequency != NEVER) {
68        Path p = new Path(filename + "_" + attempt);
69        FileSystem fs = FileSystem.get(getConf());
70        if (fs.exists(p)) {
71          throw new RuntimeException("aggregatorWriter file already" +
72              " exists: " + p.getName());
73        }
74        output = fs.create(p);
75      }
76    }
77  
78    @Override
79    public void writeAggregator(
80        Iterable<Entry<String, Writable>> aggregatorMap,
81        long superstep) throws IOException {
82      if (shouldWrite(superstep)) {
83        for (Entry<String, Writable> entry : aggregatorMap) {
84          byte[] bytes = aggregatorToString(entry.getKey(), entry.getValue(),
85              superstep).getBytes(Charsets.UTF_8);
86          output.write(bytes, 0, bytes.length);
87        }
88        output.flush();
89      }
90    }
91  
92    /**
93     * Implements the way an aggregator is converted into a String.
94     * Override this if you want to implement your own text format.
95     *
96     * @param aggregatorName Name of the aggregator
97     * @param value Value of aggregator
98     * @param superstep Current superstep
99     * @return The String representation for the aggregator
100    */
101   protected String aggregatorToString(String aggregatorName,
102       Writable value,
103       long superstep) {
104     return new StringBuilder("superstep=").append(superstep).append("\t")
105         .append(aggregatorName).append("=").append(value).append("\n")
106         .toString();
107   }
108 
109   /**
110    * Should write this superstep?
111    *
112    * @param superstep Superstep to check
113    * @return True if should write, false otherwise
114    */
115   private boolean shouldWrite(long superstep) {
116     return (frequency == AT_THE_END && superstep == LAST_SUPERSTEP) ||
117         (frequency != NEVER && frequency != AT_THE_END &&
118             superstep % frequency == 0);
119   }
120 
121   @Override
122   public void close() throws IOException {
123     if (output != null) {
124       output.close();
125     }
126   }
127 }