This project has retired. For details please refer to its Attic page.
SimpleAggregatorWriter xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.examples;
20  
21  import java.io.IOException;
22  import java.util.Map.Entry;
23  import org.apache.giraph.aggregators.AggregatorWriter;
24  import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
25  import org.apache.hadoop.fs.FSDataOutputStream;
26  import org.apache.hadoop.fs.FileSystem;
27  import org.apache.hadoop.fs.Path;
28  import org.apache.hadoop.io.Writable;
29  import org.apache.hadoop.mapreduce.Mapper.Context;
30  
31  /**
32   * This is a simple example for an aggregator writer. After each superstep
33   * the writer will persist the aggregator values to disk, by use of the
34   * Writable interface. The file will be created on the current working
35   * directory.
36   */
37  public class SimpleAggregatorWriter extends
38      DefaultImmutableClassesGiraphConfigurable implements
39      AggregatorWriter {
40    /** Name of the file we wrote to */
41    private static String FILENAME;
42    /** Saved output stream to write to */
43    private FSDataOutputStream output;
44  
45    public static String getFilename() {
46      return FILENAME;
47    }
48  
49    @SuppressWarnings("rawtypes")
50    @Override
51    public void initialize(Context context, long applicationAttempt)
52      throws IOException {
53      setFilename(applicationAttempt);
54      Path p = new Path(FILENAME);
55      FileSystem fs = FileSystem.get(context.getConfiguration());
56      output = fs.create(p, true);
57    }
58  
59    /**
60     * Set filename written to
61     *
62     * @param applicationAttempt app attempt
63     */
64    private static void setFilename(long applicationAttempt) {
65      FILENAME = "aggregatedValues_" + applicationAttempt;
66    }
67  
68    @Override
69    public void writeAggregator(
70        Iterable<Entry<String, Writable>> aggregatorMap,
71        long superstep) throws IOException {
72      for (Entry<String, Writable> entry : aggregatorMap) {
73        entry.getValue().write(output);
74      }
75      output.flush();
76    }
77  
78    @Override
79    public void close() throws IOException {
80      output.close();
81    }
82  }