This project has retired. For details please refer to its Attic page.
PseudoRandomVertexInputFormat xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.formats;
20  
21  import org.apache.giraph.bsp.BspInputSplit;
22  import org.apache.giraph.edge.EdgeFactory;
23  import org.apache.giraph.edge.OutEdges;
24  import org.apache.giraph.graph.Vertex;
25  import org.apache.giraph.io.VertexInputFormat;
26  import org.apache.giraph.io.VertexReader;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.io.DoubleWritable;
29  import org.apache.hadoop.io.LongWritable;
30  import org.apache.hadoop.mapreduce.InputSplit;
31  import org.apache.hadoop.mapreduce.JobContext;
32  import org.apache.hadoop.mapreduce.TaskAttemptContext;
33  import org.apache.log4j.Logger;
34  
35  import com.google.common.collect.Sets;
36  
37  import java.io.IOException;
38  import java.util.List;
39  import java.util.Random;
40  import java.util.Set;
41  
42  /**
43   * This VertexInputFormat is meant for large scale testing.  It allows the user
44   * to create an input data source that a variable number of aggregate vertices
45   * and edges per vertex that is repeatable for the exact same parameter
46   * (pseudo-random).
47   */
48  public class PseudoRandomVertexInputFormat extends
49      VertexInputFormat<LongWritable, DoubleWritable, DoubleWritable> {
50    @Override public void checkInputSpecs(Configuration conf) { }
51  
52    @Override
53    public final List<InputSplit> getSplits(final JobContext context,
54        final int minSplitCountHint) throws IOException, InterruptedException {
55      return PseudoRandomUtils.getSplits(minSplitCountHint);
56    }
57  
58    @Override
59    public VertexReader<LongWritable, DoubleWritable, DoubleWritable>
60    createVertexReader(InputSplit split, TaskAttemptContext context)
61      throws IOException {
62      return new PseudoRandomVertexReader();
63    }
64  
65    /**
66     * Used by {@link PseudoRandomVertexInputFormat} to read
67     * pseudo-randomly generated data.
68     */
69    private static class PseudoRandomVertexReader extends
70        VertexReader<LongWritable, DoubleWritable, DoubleWritable> {
71      /** Logger. */
72      private static final Logger LOG =
73          Logger.getLogger(PseudoRandomVertexReader.class);
74      /** Starting vertex id. */
75      private long startingVertexId = -1;
76      /** Vertices read so far. */
77      private long verticesRead = 0;
78      /** Total vertices to read (on this split alone). */
79      private long totalSplitVertices = -1;
80      /** Aggregate vertices (all input splits). */
81      private long aggregateVertices = -1;
82      /** Edges per vertex. */
83      private int edgesPerVertex = -1;
84      /** BspInputSplit (used only for index). */
85      private BspInputSplit bspInputSplit;
86      /** Helper for generating pseudo-random local edges. */
87      private PseudoRandomLocalEdgesHelper localEdgesHelper;
88  
89      /**
90       * Default constructor for reflection.
91       */
92      public PseudoRandomVertexReader() {
93      }
94  
95      @Override
96      public void initialize(InputSplit inputSplit,
97          TaskAttemptContext context) throws IOException {
98        aggregateVertices = getConf().getLong(
99              PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 0);
100       if (aggregateVertices <= 0) {
101         throw new IllegalArgumentException(
102             PseudoRandomInputFormatConstants.AGGREGATE_VERTICES + " <= 0");
103       }
104       if (inputSplit instanceof BspInputSplit) {
105         bspInputSplit = (BspInputSplit) inputSplit;
106         long extraVertices =
107             aggregateVertices % bspInputSplit.getNumSplits();
108         totalSplitVertices =
109             aggregateVertices / bspInputSplit.getNumSplits();
110         if (bspInputSplit.getSplitIndex() < extraVertices) {
111           ++totalSplitVertices;
112         }
113         startingVertexId = (bspInputSplit.getSplitIndex() *
114             (aggregateVertices / bspInputSplit.getNumSplits())) +
115             Math.min(bspInputSplit.getSplitIndex(),
116                      extraVertices);
117       } else {
118         throw new IllegalArgumentException(
119             "initialize: Got " + inputSplit.getClass() +
120             " instead of " + BspInputSplit.class);
121       }
122       edgesPerVertex = getConf().getInt(
123           PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 0);
124       if (edgesPerVertex <= 0) {
125         throw new IllegalArgumentException(
126           PseudoRandomInputFormatConstants.EDGES_PER_VERTEX + " <= 0");
127       }
128       float minLocalEdgesRatio = getConf().getFloat(
129           PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO,
130           PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO_DEFAULT);
131       localEdgesHelper = new PseudoRandomLocalEdgesHelper(aggregateVertices,
132           minLocalEdgesRatio, getConf());
133     }
134 
135     @Override
136     public boolean nextVertex() throws IOException, InterruptedException {
137       return totalSplitVertices > verticesRead;
138     }
139 
140     @Override
141     public Vertex<LongWritable, DoubleWritable, DoubleWritable>
142     getCurrentVertex() throws IOException, InterruptedException {
143       Vertex<LongWritable, DoubleWritable, DoubleWritable>
144       vertex = getConf().createVertex();
145       long vertexId = startingVertexId + verticesRead;
146       // Seed on the vertex id to keep the vertex data the same when
147       // on different number of workers, but other parameters are the
148       // same.
149       Random rand = new Random(vertexId);
150       DoubleWritable vertexValue = new DoubleWritable(rand.nextDouble());
151       // In order to save memory and avoid copying, we add directly to a
152       // OutEdges instance.
153       OutEdges<LongWritable, DoubleWritable> edges =
154           getConf().createAndInitializeOutEdges(edgesPerVertex);
155       Set<LongWritable> destVertices = Sets.newHashSet();
156       for (long i = 0; i < edgesPerVertex; ++i) {
157         LongWritable destVertexId = new LongWritable();
158         do {
159           destVertexId.set(
160               localEdgesHelper.generateDestVertex(vertexId, rand));
161         } while (destVertices.contains(destVertexId));
162         edges.add(EdgeFactory.create(destVertexId,
163             new DoubleWritable(rand.nextDouble())));
164         destVertices.add(destVertexId);
165       }
166       vertex.initialize(new LongWritable(vertexId), vertexValue, edges);
167       ++verticesRead;
168       if (LOG.isTraceEnabled()) {
169         LOG.trace("next: Return vertexId=" +
170             vertex.getId().get() +
171             ", vertexValue=" + vertex.getValue() +
172             ", edges=" + vertex.getEdges());
173       }
174       return vertex;
175     }
176 
177     @Override
178     public void close() throws IOException { }
179 
180     @Override
181     public float getProgress() throws IOException {
182       return verticesRead * 100.0f / totalSplitVertices;
183     }
184   }
185 }