This project has retired. For details please refer to its Attic page.
WattsStrogatzVertexInputFormat xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.io.formats;
19  
20  import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
21  import it.unimi.dsi.fastutil.longs.LongSet;
22  
23  import java.io.IOException;
24  import java.util.List;
25  import java.util.Random;
26  
27  import org.apache.giraph.bsp.BspInputSplit;
28  import org.apache.giraph.edge.Edge;
29  import org.apache.giraph.edge.OutEdges;
30  import org.apache.giraph.edge.ReusableEdge;
31  import org.apache.giraph.graph.Vertex;
32  import org.apache.giraph.io.VertexInputFormat;
33  import org.apache.giraph.io.VertexReader;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.io.DoubleWritable;
36  import org.apache.hadoop.io.LongWritable;
37  import org.apache.hadoop.mapreduce.InputSplit;
38  import org.apache.hadoop.mapreduce.JobContext;
39  import org.apache.hadoop.mapreduce.TaskAttemptContext;
40  
41  /**
42   * Generates a random Watts-Strogatz graph by re-wiring a ring lattice.
43   * The resulting graph is a random graph with high clustering coefficient
44   * and low average path length. The graph has these two characteristics that
45   * are typical of small-world scale-free graphs, however the degree
46   * distribution is more similar to a random graph.
47   * It supports a seed for pseudo-random generation.
48   */
49  public class WattsStrogatzVertexInputFormat extends
50    VertexInputFormat<LongWritable, DoubleWritable, DoubleWritable> {
51    /** The number of vertices in the graph */
52    private static final String AGGREGATE_VERTICES =
53        "wattsStrogatz.aggregateVertices";
54    /** The number of outgoing edges per vertex */
55    private static final String EDGES_PER_VERTEX =
56        "wattsStrogatz.edgesPerVertex";
57    /** The probability to re-wire an outgoing edge from the ring lattice */
58    private static final String BETA =
59        "wattsStrogatz.beta";
60    /** The seed to generate random values for pseudo-randomness */
61    private static final String SEED =
62        "wattsStrogatz.seed";
63  
64    @Override
65    public void checkInputSpecs(Configuration conf) { }
66  
67    @Override
68    public final List<InputSplit> getSplits(final JobContext context,
69        final int minSplitCountHint) throws IOException, InterruptedException {
70      return PseudoRandomUtils.getSplits(minSplitCountHint);
71    }
72  
73    @Override
74    public VertexReader<LongWritable, DoubleWritable, DoubleWritable>
75    createVertexReader(InputSplit split,
76        TaskAttemptContext context) throws IOException {
77      return new WattsStrogatzVertexReader();
78    }
79  
80    /**
81     * Vertex reader used to generate the graph
82     */
83    private static class WattsStrogatzVertexReader extends
84      VertexReader<LongWritable, DoubleWritable, DoubleWritable> {
85      /** the re-wiring probability */
86      private float beta = 0;
87      /** The total number of vertices */
88      private long aggregateVertices = 0;
89      /** The starting vertex id for this split */
90      private long startingVertexId = -1;
91      /** The number of vertices read so far */
92      private long verticesRead = 0;
93      /** The total number of vertices in the split */
94      private long totalSplitVertices = -1;
95      /** the total number of outgoing edges per vertex */
96      private int edgesPerVertex = -1;
97      /** The target ids of the outgoing edges */
98      private final LongSet destVertices = new LongOpenHashSet();
99      /** The random values generator */
100     private Random rnd;
101     /** The reusable edge */
102     private ReusableEdge<LongWritable, DoubleWritable> reusableEdge = null;
103 
104     /**
105      * Default constructor
106      */
107     public WattsStrogatzVertexReader() { }
108 
109     @Override
110     public void initialize(InputSplit inputSplit,
111         TaskAttemptContext context) throws IOException {
112       beta = getConf().getFloat(
113           BETA, 0.0f);
114       aggregateVertices = getConf().getLong(
115           AGGREGATE_VERTICES, 0);
116       BspInputSplit bspInputSplit = (BspInputSplit) inputSplit;
117       long extraVertices = aggregateVertices % bspInputSplit.getNumSplits();
118       totalSplitVertices = aggregateVertices / bspInputSplit.getNumSplits();
119       if (bspInputSplit.getSplitIndex() < extraVertices) {
120         ++totalSplitVertices;
121       }
122       startingVertexId = bspInputSplit.getSplitIndex() *
123           (aggregateVertices / bspInputSplit.getNumSplits()) +
124           Math.min(bspInputSplit.getSplitIndex(), extraVertices);
125       edgesPerVertex = getConf().getInt(
126           EDGES_PER_VERTEX, 0);
127       if (getConf().reuseEdgeObjects()) {
128         reusableEdge = getConf().createReusableEdge();
129       }
130       int seed = getConf().getInt(SEED, -1);
131       if (seed != -1) {
132         rnd = new Random(seed);
133       } else {
134         rnd = new Random();
135       }
136     }
137 
138     @Override
139     public boolean nextVertex() throws IOException, InterruptedException {
140       return totalSplitVertices > verticesRead;
141     }
142 
143     /**
144      * Return a long value uniformly distributed between 0 (inclusive) and n.
145      *
146      * @param n the upper bound for the random long value
147      * @return the random value
148      */
149     private long nextLong(long n) {
150       long bits;
151       long val;
152       do {
153         bits = (rnd.nextLong() << 1) >>> 1;
154         val = bits % n;
155       } while (bits - val + (n - 1) < 0L);
156       return val;
157     }
158 
159     /**
160      * Get a destination id that is not already in the neighborhood and
161      * that is not the vertex itself (no self-loops). For the second condition
162      * it expects destVertices to contain the own id already.
163      *
164      * @return the destination vertex id
165      */
166     private long getRandomDestination() {
167       long randomId;
168       do {
169         randomId = nextLong(aggregateVertices);
170       } while (!destVertices.add(randomId));
171       return randomId;
172     }
173 
174     @Override
175     public Vertex<LongWritable, DoubleWritable, DoubleWritable>
176     getCurrentVertex() throws IOException, InterruptedException {
177       Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex =
178           getConf().createVertex();
179       long vertexId = startingVertexId + verticesRead;
180       OutEdges<LongWritable, DoubleWritable> edges =
181           getConf().createOutEdges();
182       edges.initialize(edgesPerVertex);
183       destVertices.clear();
184       destVertices.add(vertexId);
185       long destVertexId = vertexId - edgesPerVertex / 2;
186       if (destVertexId < 0) {
187         destVertexId = aggregateVertices + destVertexId;
188       }
189       for (int i = 0; i < edgesPerVertex + 1; ++i) {
190         if (destVertexId != vertexId) {
191           Edge<LongWritable, DoubleWritable> edge =
192               (reusableEdge == null) ? getConf().createEdge() : reusableEdge;
193           edge.getTargetVertexId().set(
194               rnd.nextFloat() < beta ? getRandomDestination() : destVertexId);
195           edge.getValue().set(rnd.nextDouble());
196           edges.add(edge);
197         }
198         destVertexId = (destVertexId + 1) % aggregateVertices;
199       }
200       vertex.initialize(new LongWritable(vertexId),
201           new DoubleWritable(rnd.nextDouble()), edges);
202       ++verticesRead;
203       return vertex;
204     }
205 
206     @Override
207     public void close() throws IOException { }
208 
209     @Override
210     public float getProgress() throws IOException {
211       return verticesRead * 100.0f / totalSplitVertices;
212     }
213   }
214 }