1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.aggregators;
2021import com.google.common.base.Charsets;
22import java.io.IOException;
23import java.util.Map.Entry;
24import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
25import org.apache.hadoop.fs.FSDataOutputStream;
26import org.apache.hadoop.fs.FileSystem;
27import org.apache.hadoop.fs.Path;
28import org.apache.hadoop.io.Writable;
29import org.apache.hadoop.mapreduce.Mapper.Context;
3031/**32 * Default implementation of {@link AggregatorWriter}. Each line consists of33 * text and contains the aggregator name, the aggregator value and the34 * aggregator class.35 */36publicclassTextAggregatorWriter37extendsDefaultImmutableClassesGiraphConfigurable38implementsAggregatorWriter {
39/** The filename of the outputfile */40publicstaticfinal String FILENAME =
41"giraph.textAggregatorWriter.filename";
42/** Signal for "never write" frequency */43publicstaticfinalint NEVER = 0;
44/** Signal for "write only the final values" frequency */45publicstaticfinalint AT_THE_END = -1;
46/** Signal for "write values in every superstep" frequency */47publicstaticfinalint ALWAYS = 1;
48/** The frequency of writing:49 * - NEVER: never write, files aren't created at all50 * - AT_THE_END: aggregators are written only when the computation is over51 * - int: i.e. 1 is every superstep, 2 every two supersteps and so on52 */53publicstaticfinal String FREQUENCY =
54"giraph.textAggregatorWriter.frequency";
55/** Default filename for dumping aggregator values */56privatestaticfinal String DEFAULT_FILENAME = "aggregatorValues";
57/** Handle to the outputfile */58protected FSDataOutputStream output;
59/** Write every "frequency" supersteps */60privateint frequency;
6162 @Override
63 @SuppressWarnings("rawtypes")
64publicvoid initialize(Context context, long attempt) throws IOException {
65 frequency = getConf().getInt(FREQUENCY, NEVER);
66 String filename = getConf().get(FILENAME, DEFAULT_FILENAME);
67if (frequency != NEVER) {
68 Path p = new Path(filename + "_" + attempt);
69 FileSystem fs = FileSystem.get(getConf());
70if (fs.exists(p)) {
71thrownew RuntimeException("aggregatorWriter file already" +
72" exists: " + p.getName());
73 }
74 output = fs.create(p);
75 }
76 }
7778 @Override
79publicvoid writeAggregator(
80 Iterable<Entry<String, Writable>> aggregatorMap,
81long superstep) throws IOException {
82if (shouldWrite(superstep)) {
83for (Entry<String, Writable> entry : aggregatorMap) {
84 byte[] bytes = aggregatorToString(entry.getKey(), entry.getValue(),
85 superstep).getBytes(Charsets.UTF_8);
86 output.write(bytes, 0, bytes.length);
87 }
88 output.flush();
89 }
90 }
9192/**93 * Implements the way an aggregator is converted into a String.94 * Override this if you want to implement your own text format.95 *96 * @param aggregatorName Name of the aggregator97 * @param value Value of aggregator98 * @param superstep Current superstep99 * @return The String representation for the aggregator100 */101protected String aggregatorToString(String aggregatorName,
102 Writable value,
103long superstep) {
104returnnew StringBuilder("superstep=").append(superstep).append("\t")
105 .append(aggregatorName).append("=").append(value).append("\n")
106 .toString();
107 }
108109/**110 * Should write this superstep?111 *112 * @param superstep Superstep to check113 * @return True if should write, false otherwise114 */115privateboolean shouldWrite(long superstep) {
116return (frequency == AT_THE_END && superstep == LAST_SUPERSTEP) ||
117 (frequency != NEVER && frequency != AT_THE_END &&
118 superstep % frequency == 0);
119 }
120121 @Override
122publicvoid close() throws IOException {
123if (output != null) {
124 output.close();
125 }
126 }
127 }