1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.examples;
2021import org.apache.giraph.aggregators.DoubleOverwriteAggregator;
22import org.apache.giraph.graph.BasicComputation;
23import org.apache.giraph.master.DefaultMasterCompute;
24import org.apache.giraph.graph.Vertex;
25import org.apache.giraph.worker.WorkerContext;
26import org.apache.hadoop.io.DoubleWritable;
27import org.apache.hadoop.io.FloatWritable;
28import org.apache.hadoop.io.LongWritable;
29import org.apache.log4j.Logger;
3031import java.io.IOException;
3233/**34 * Demonstrates a computation with a centralized part implemented via a35 * MasterCompute.36 */37publicclassSimpleMasterComputeComputationextends BasicComputation<
38 LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
39/**Aggregator to get values from the master to the workers */40publicstaticfinal String SMC_AGG = "simplemastercompute.aggregator";
41/** Logger */42privatestaticfinal Logger LOG =
43 Logger.getLogger(SimpleMasterComputeComputation.class);
4445 @Override
46publicvoid compute(
47 Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
48 Iterable<DoubleWritable> messages) throws IOException {
49double oldSum = getSuperstep() == 0 ? 0 : vertex.getValue().get();
50double newValue = this.<DoubleWritable>getAggregatedValue(SMC_AGG).get();
51double newSum = oldSum + newValue;
52 vertex.setValue(new DoubleWritable(newSum));
53SimpleMasterComputeWorkerContext workerContext = getWorkerContext();
54 workerContext.setFinalSum(newSum);
55 LOG.info("Current sum: " + newSum);
56 }
5758/**59 * Worker context used with {@link SimpleMasterComputeComputation}.60 */61publicstaticclassSimpleMasterComputeWorkerContext62extendsWorkerContext {
63/** Final sum value for verification for local jobs */64privatestaticdouble FINAL_SUM;
6566 @Override
67publicvoid preApplication()
68throws InstantiationException, IllegalAccessException {
69 }
7071 @Override
72publicvoid preSuperstep() {
73 }
7475 @Override
76publicvoid postSuperstep() {
77 }
7879 @Override
80publicvoid postApplication() {
81 }
8283publicstaticvoid setFinalSum(double sum) {
84 FINAL_SUM = sum;
85 }
8687publicstaticdouble getFinalSum() {
88return FINAL_SUM;
89 }
90 }
9192/**93 * MasterCompute used with {@link SimpleMasterComputeComputation}.94 */95publicstaticclassSimpleMasterCompute96extendsDefaultMasterCompute {
97 @Override
98publicvoid compute() {
99 setAggregatedValue(SMC_AGG,
100new DoubleWritable(((double) getSuperstep()) / 2 + 1));
101if (getSuperstep() == 10) {
102 haltComputation();
103 }
104 }
105106 @Override
107publicvoid initialize() throws InstantiationException,
108 IllegalAccessException {
109 registerAggregator(SMC_AGG, DoubleOverwriteAggregator.class);
110 }
111 }
112 }