This project has retired. For details please refer to its Attic page.
PseudoRandomIntNullVertexInputFormat xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.formats;
20  
21  import org.apache.giraph.bsp.BspInputSplit;
22  import org.apache.giraph.edge.Edge;
23  import org.apache.giraph.edge.OutEdges;
24  import org.apache.giraph.edge.ReusableEdge;
25  import org.apache.giraph.graph.Vertex;
26  import org.apache.giraph.io.VertexInputFormat;
27  import org.apache.giraph.io.VertexReader;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.io.FloatWritable;
30  import org.apache.hadoop.io.IntWritable;
31  import org.apache.hadoop.io.NullWritable;
32  import org.apache.hadoop.mapreduce.InputSplit;
33  import org.apache.hadoop.mapreduce.JobContext;
34  import org.apache.hadoop.mapreduce.TaskAttemptContext;
35  
36  import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
37  import it.unimi.dsi.fastutil.ints.IntSet;
38  
39  import java.io.IOException;
40  import java.util.List;
41  import java.util.Random;
42  
43  /**
44   * VertexInputFormat for large scale testing,
45   * like {@link PseudoRandomVertexInputFormat}, but for the unweighted graphs
46   * where vertex ids are integers.
47   */
48  public class PseudoRandomIntNullVertexInputFormat extends
49      VertexInputFormat<IntWritable, FloatWritable, NullWritable> {
50    @Override public void checkInputSpecs(Configuration conf) { }
51  
52    @Override
53    public final List<InputSplit> getSplits(final JobContext context,
54        final int minSplitCountHint) throws IOException, InterruptedException {
55      return PseudoRandomUtils.getSplits(minSplitCountHint);
56    }
57  
58    @Override
59    public VertexReader<IntWritable, FloatWritable, NullWritable>
60    createVertexReader(InputSplit split,
61        TaskAttemptContext context) throws IOException {
62      return new PseudoRandomVertexReader();
63    }
64  
65    /**
66     * Used by {@link PseudoRandomIntNullVertexInputFormat} to read
67     * pseudo-randomly generated data.
68     */
69    private static class PseudoRandomVertexReader extends
70        VertexReader<IntWritable, FloatWritable, NullWritable> {
71      /** Starting vertex id. */
72      private int startingVertexId = -1;
73      /** Vertices read so far. */
74      private int verticesRead = 0;
75      /** Total vertices to read (on this split alone). */
76      private int totalSplitVertices = -1;
77      /** Edges per vertex. */
78      private int edgesPerVertex = -1;
79      /** Reusable int set */
80      private final IntSet destVertices = new IntOpenHashSet();
81      /** Resuable edge object */
82      private ReusableEdge<IntWritable, NullWritable> reusableEdge = null;
83      /** Helper for generating pseudo-random local edges. */
84      private PseudoRandomIntNullLocalEdgesHelper localEdgesHelper;
85      /** Random */
86      private Random rand;
87  
88      /** Default constructor for reflection. */
89      public PseudoRandomVertexReader() {
90      }
91  
92      @Override
93      public void initialize(InputSplit inputSplit,
94          TaskAttemptContext context) throws IOException {
95        int aggregateVertices = getConf().getInt(
96            PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 0);
97        BspInputSplit bspInputSplit = (BspInputSplit) inputSplit;
98        int extraVertices = aggregateVertices % bspInputSplit.getNumSplits();
99        totalSplitVertices = aggregateVertices / bspInputSplit.getNumSplits();
100       if (bspInputSplit.getSplitIndex() < extraVertices) {
101         ++totalSplitVertices;
102       }
103       startingVertexId = bspInputSplit.getSplitIndex() *
104           (aggregateVertices / bspInputSplit.getNumSplits()) +
105           Math.min(bspInputSplit.getSplitIndex(), extraVertices);
106       edgesPerVertex = getConf().getInt(
107           PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 0);
108       rand = new Random(bspInputSplit.getSplitIndex());
109       if (getConf().reuseEdgeObjects()) {
110         reusableEdge = getConf().createReusableEdge();
111       }
112       localEdgesHelper = new PseudoRandomIntNullLocalEdgesHelper(
113           aggregateVertices, getConf());
114     }
115 
116     @Override
117     public boolean nextVertex() throws IOException, InterruptedException {
118       return totalSplitVertices > verticesRead;
119     }
120 
121     @Override
122     public Vertex<IntWritable, FloatWritable, NullWritable>
123     getCurrentVertex() throws IOException, InterruptedException {
124       Vertex<IntWritable, FloatWritable, NullWritable> vertex =
125           getConf().createVertex();
126       int vertexId = startingVertexId + verticesRead;
127       OutEdges<IntWritable, NullWritable> edges =
128           getConf().createOutEdges();
129       edges.initialize(edgesPerVertex);
130       destVertices.clear();
131       for (int i = 0; i < edgesPerVertex; ++i) {
132         int destVertexId;
133         do {
134           destVertexId = localEdgesHelper.generateDestVertex(vertexId, rand);
135         } while (!destVertices.add(destVertexId));
136         Edge<IntWritable, NullWritable> edge =
137             (reusableEdge == null) ? getConf().createEdge() : reusableEdge;
138         edge.getTargetVertexId().set(destVertexId);
139         edges.add(edge);
140       }
141       vertex.initialize(
142           new IntWritable(vertexId), new FloatWritable(1.0f), edges);
143       ++verticesRead;
144       return vertex;
145     }
146 
147     @Override
148     public void close() throws IOException {
149     }
150 
151     @Override
152     public float getProgress() throws IOException {
153       return verticesRead * 100.0f / totalSplitVertices;
154     }
155   }
156 }