1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.examples;
2021import java.io.BufferedReader;
22import java.io.IOException;
23import java.io.InputStreamReader;
24import java.nio.charset.Charset;
25import java.util.Set;
2627import org.apache.giraph.worker.WorkerContext;
28import org.apache.hadoop.conf.Configuration;
29import org.apache.hadoop.filecache.DistributedCache;
30import org.apache.hadoop.fs.FileSystem;
31import org.apache.hadoop.fs.Path;
32import org.apache.log4j.Logger;
3334import com.google.common.collect.ImmutableSet;
3536/**37 * Worker context for random walks.38 */39publicclassRandomWalkWorkerContextextendsWorkerContext {
40/** Default maximum number of iterations */41privatestaticfinalint DEFAULT_MAX_SUPERSTEPS = 30;
42/** Default teleportation probability */43privatestaticfinalfloat DEFAULT_TELEPORTATION_PROBABILITY = 0.15f;
44/** Maximum number of iterations */45privatestaticint MAX_SUPERSTEPS;
46/** Teleportation probability */47privatestaticdouble TELEPORTATION_PROBABILITY;
48/** Preference vector */49privatestatic Set<Long> SOURCES;
5051/** Configuration parameter for the source vertex */52privatestaticfinal String SOURCE_VERTEX =
53 RandomWalkWithRestartComputation.class.getName() + ".sourceVertex";
5455/** Logger */56privatestaticfinal Logger LOG = Logger
57 .getLogger(RandomWalkWorkerContext.class);
5859/**60 * @return The maximum number of iterations to perform.61 */62publicint getMaxSupersteps() {
63if (MAX_SUPERSTEPS == 0) {
64thrownew IllegalStateException(
65 RandomWalkWorkerContext.class.getSimpleName() +
66" was not initialized. Relaunch your job " +
67"by setting the appropriate WorkerContext");
68 }
69return MAX_SUPERSTEPS;
70 }
7172/**73 * @return The teleportation probability.74 */75publicdouble getTeleportationProbability() {
76if (TELEPORTATION_PROBABILITY == 0) {
77thrownew IllegalStateException(
78 RandomWalkWorkerContext.class.getSimpleName() +
79" was not initialized. Relaunch your job " +
80"by setting the appropriate WorkerContext");
81 }
82return TELEPORTATION_PROBABILITY;
83 }
8485/**86 * Checks if a vertex is a source.87 * @param id The vertex ID to check.88 * @return True if the vertex is a source in the preference vector.89 */90publicboolean isSource(long id) {
91return SOURCES.contains(id);
92 }
9394/**95 * @return The number of sources in the preference vector.96 */97publicint numSources() {
98return SOURCES.size();
99 }
100101/**102 * Initialize sources for Random Walk with Restart. First option103 * (preferential) is single source given from the command line as a parameter.104 * Second option is a file with a list of vertex IDs, one per line. In this105 * second case the preference vector is a uniform distribution over these106 * vertexes.107 * @param configuration The configuration.108 * @return a (possibly empty) set of source vertices109 */110private ImmutableSet<Long> initializeSources(Configuration configuration) {
111 ImmutableSet.Builder<Long> builder = ImmutableSet.builder();
112long sourceVertex = configuration.getLong(SOURCE_VERTEX, Long.MIN_VALUE);
113if (sourceVertex != Long.MIN_VALUE) {
114return ImmutableSet.of(sourceVertex);
115 } else {
116 Path sourceFile = null;
117try {
118119 Path[] cacheFiles = DistributedCache.getLocalCacheFiles(configuration);
120if (cacheFiles == null || cacheFiles.length == 0) {
121// empty set if no source vertices configured122return ImmutableSet.of();
123 }
124125 sourceFile = cacheFiles[0];
126 FileSystem fs = FileSystem.getLocal(configuration);
127 BufferedReader in = new BufferedReader(new InputStreamReader(
128 fs.open(sourceFile), Charset.defaultCharset()));
129 String line;
130while ((line = in.readLine()) != null) {
131 builder.add(Long.parseLong(line));
132 }
133 in.close();
134 } catch (IOException e) {
135 getContext().setStatus(
136"Could not load local cache files: " + sourceFile);
137 LOG.error("Could not load local cache files: " + sourceFile, e);
138 }
139 }
140return builder.build();
141 }
142143 @Override
144publicvoid preApplication() throws InstantiationException,
145 IllegalAccessException {
146 setStaticVars(getContext().getConfiguration());
147 }
148149/**150 * Set static variables from Configuration151 *152 * @param configuration the conf153 */154privatevoid setStaticVars(Configuration configuration) {
155 MAX_SUPERSTEPS = configuration.getInt(RandomWalkComputation.MAX_SUPERSTEPS,
156 DEFAULT_MAX_SUPERSTEPS);
157 TELEPORTATION_PROBABILITY = configuration.getFloat(
158 RandomWalkComputation.TELEPORTATION_PROBABILITY,
159 DEFAULT_TELEPORTATION_PROBABILITY);
160 SOURCES = initializeSources(configuration);
161 }
162163 @Override
164publicvoid preSuperstep() {
165 }
166167 @Override
168publicvoid postSuperstep() {
169 }
170171 @Override
172publicvoid postApplication() {
173 }
174 }