1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.io.formats;
1920import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
21import it.unimi.dsi.fastutil.longs.LongSet;
2223import java.io.IOException;
24import java.util.List;
25import java.util.Random;
2627import org.apache.giraph.bsp.BspInputSplit;
28import org.apache.giraph.edge.Edge;
29import org.apache.giraph.edge.OutEdges;
30import org.apache.giraph.edge.ReusableEdge;
31import org.apache.giraph.graph.Vertex;
32import org.apache.giraph.io.VertexInputFormat;
33import org.apache.giraph.io.VertexReader;
34import org.apache.hadoop.conf.Configuration;
35import org.apache.hadoop.io.DoubleWritable;
36import org.apache.hadoop.io.LongWritable;
37import org.apache.hadoop.mapreduce.InputSplit;
38import org.apache.hadoop.mapreduce.JobContext;
39import org.apache.hadoop.mapreduce.TaskAttemptContext;
4041/**42 * Generates a random Watts-Strogatz graph by re-wiring a ring lattice.43 * The resulting graph is a random graph with high clustering coefficient44 * and low average path length. The graph has these two characteristics that45 * are typical of small-world scale-free graphs, however the degree46 * distribution is more similar to a random graph.47 * It supports a seed for pseudo-random generation.48 */49publicclassWattsStrogatzVertexInputFormatextends50 VertexInputFormat<LongWritable, DoubleWritable, DoubleWritable> {
51/** The number of vertices in the graph */52privatestaticfinal String AGGREGATE_VERTICES =
53"wattsStrogatz.aggregateVertices";
54/** The number of outgoing edges per vertex */55privatestaticfinal String EDGES_PER_VERTEX =
56"wattsStrogatz.edgesPerVertex";
57/** The probability to re-wire an outgoing edge from the ring lattice */58privatestaticfinal String BETA =
59"wattsStrogatz.beta";
60/** The seed to generate random values for pseudo-randomness */61privatestaticfinal String SEED =
62"wattsStrogatz.seed";
6364 @Override
65publicvoid checkInputSpecs(Configuration conf) { }
6667 @Override
68publicfinal List<InputSplit> getSplits(final JobContext context,
69finalint minSplitCountHint) throws IOException, InterruptedException {
70return PseudoRandomUtils.getSplits(minSplitCountHint);
71 }
7273 @Override
74public VertexReader<LongWritable, DoubleWritable, DoubleWritable>
75 createVertexReader(InputSplit split,
76 TaskAttemptContext context) throws IOException {
77returnnewWattsStrogatzVertexReader();
78 }
7980/**81 * Vertex reader used to generate the graph82 */83privatestaticclassWattsStrogatzVertexReaderextends84 VertexReader<LongWritable, DoubleWritable, DoubleWritable> {
85/** the re-wiring probability */86privatefloat beta = 0;
87/** The total number of vertices */88privatelong aggregateVertices = 0;
89/** The starting vertex id for this split */90privatelong startingVertexId = -1;
91/** The number of vertices read so far */92privatelong verticesRead = 0;
93/** The total number of vertices in the split */94privatelong totalSplitVertices = -1;
95/** the total number of outgoing edges per vertex */96privateint edgesPerVertex = -1;
97/** The target ids of the outgoing edges */98privatefinal LongSet destVertices = new LongOpenHashSet();
99/** The random values generator */100private Random rnd;
101/** The reusable edge */102private ReusableEdge<LongWritable, DoubleWritable> reusableEdge = null;
103104/**105 * Default constructor106 */107publicWattsStrogatzVertexReader() { }
108109 @Override
110publicvoid initialize(InputSplit inputSplit,
111 TaskAttemptContext context) throws IOException {
112 beta = getConf().getFloat(
113 BETA, 0.0f);
114 aggregateVertices = getConf().getLong(
115 AGGREGATE_VERTICES, 0);
116BspInputSplit bspInputSplit = (BspInputSplit) inputSplit;
117long extraVertices = aggregateVertices % bspInputSplit.getNumSplits();
118 totalSplitVertices = aggregateVertices / bspInputSplit.getNumSplits();
119if (bspInputSplit.getSplitIndex() < extraVertices) {
120 ++totalSplitVertices;
121 }
122 startingVertexId = bspInputSplit.getSplitIndex() *
123 (aggregateVertices / bspInputSplit.getNumSplits()) +
124 Math.min(bspInputSplit.getSplitIndex(), extraVertices);
125 edgesPerVertex = getConf().getInt(
126 EDGES_PER_VERTEX, 0);
127if (getConf().reuseEdgeObjects()) {
128 reusableEdge = getConf().createReusableEdge();
129 }
130int seed = getConf().getInt(SEED, -1);
131if (seed != -1) {
132 rnd = new Random(seed);
133 } else {
134 rnd = new Random();
135 }
136 }
137138 @Override
139publicboolean nextVertex() throws IOException, InterruptedException {
140return totalSplitVertices > verticesRead;
141 }
142143/**144 * Return a long value uniformly distributed between 0 (inclusive) and n.145 *146 * @param n the upper bound for the random long value147 * @return the random value148 */149privatelong nextLong(long n) {
150long bits;
151long val;
152do {
153 bits = (rnd.nextLong() << 1) >>> 1;
154 val = bits % n;
155 } while (bits - val + (n - 1) < 0L);
156return val;
157 }
158159/**160 * Get a destination id that is not already in the neighborhood and161 * that is not the vertex itself (no self-loops). For the second condition162 * it expects destVertices to contain the own id already.163 *164 * @return the destination vertex id165 */166privatelong getRandomDestination() {
167long randomId;
168do {
169 randomId = nextLong(aggregateVertices);
170 } while (!destVertices.add(randomId));
171return randomId;
172 }
173174 @Override
175public Vertex<LongWritable, DoubleWritable, DoubleWritable>
176 getCurrentVertex() throws IOException, InterruptedException {
177 Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex =
178 getConf().createVertex();
179long vertexId = startingVertexId + verticesRead;
180 OutEdges<LongWritable, DoubleWritable> edges =
181 getConf().createOutEdges();
182 edges.initialize(edgesPerVertex);
183 destVertices.clear();
184 destVertices.add(vertexId);
185long destVertexId = vertexId - edgesPerVertex / 2;
186if (destVertexId < 0) {
187 destVertexId = aggregateVertices + destVertexId;
188 }
189for (int i = 0; i < edgesPerVertex + 1; ++i) {
190if (destVertexId != vertexId) {
191 Edge<LongWritable, DoubleWritable> edge =
192 (reusableEdge == null) ? getConf().createEdge() : reusableEdge;
193 edge.getTargetVertexId().set(
194 rnd.nextFloat() < beta ? getRandomDestination() : destVertexId);
195 edge.getValue().set(rnd.nextDouble());
196 edges.add(edge);
197 }
198 destVertexId = (destVertexId + 1) % aggregateVertices;
199 }
200 vertex.initialize(new LongWritable(vertexId),
201new DoubleWritable(rnd.nextDouble()), edges);
202 ++verticesRead;
203return vertex;
204 }
205206 @Override
207publicvoid close() throws IOException { }
208209 @Override
210publicfloat getProgress() throws IOException {
211return verticesRead * 100.0f / totalSplitVertices;
212 }
213 }
214 }