1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.examples;
2021import java.io.IOException;
22import java.util.Map.Entry;
23import org.apache.giraph.aggregators.AggregatorWriter;
24import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
25import org.apache.hadoop.fs.FSDataOutputStream;
26import org.apache.hadoop.fs.FileSystem;
27import org.apache.hadoop.fs.Path;
28import org.apache.hadoop.io.Writable;
29import org.apache.hadoop.mapreduce.Mapper.Context;
3031/**32 * This is a simple example for an aggregator writer. After each superstep33 * the writer will persist the aggregator values to disk, by use of the34 * Writable interface. The file will be created on the current working35 * directory.36 */37publicclassSimpleAggregatorWriterextends38DefaultImmutableClassesGiraphConfigurableimplements39AggregatorWriter {
40/** Name of the file we wrote to */41privatestatic String FILENAME;
42/** Saved output stream to write to */43private FSDataOutputStream output;
4445publicstatic String getFilename() {
46return FILENAME;
47 }
4849 @SuppressWarnings("rawtypes")
50 @Override
51publicvoid initialize(Context context, long applicationAttempt)
52throws IOException {
53 setFilename(applicationAttempt);
54 Path p = new Path(FILENAME);
55 FileSystem fs = FileSystem.get(context.getConfiguration());
56 output = fs.create(p, true);
57 }
5859/**60 * Set filename written to61 *62 * @param applicationAttempt app attempt63 */64privatestaticvoid setFilename(long applicationAttempt) {
65 FILENAME = "aggregatedValues_" + applicationAttempt;
66 }
6768 @Override
69publicvoid writeAggregator(
70 Iterable<Entry<String, Writable>> aggregatorMap,
71long superstep) throws IOException {
72for (Entry<String, Writable> entry : aggregatorMap) {
73 entry.getValue().write(output);
74 }
75 output.flush();
76 }
7778 @Override
79publicvoid close() throws IOException {
80 output.close();
81 }
82 }