This project has retired. For details please refer to its Attic page.
PseudoRandomEdgeInputFormat xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.io.formats;
20  
21  import com.google.common.collect.Sets;
22  import java.io.IOException;
23  import java.util.List;
24  import java.util.Random;
25  import java.util.Set;
26  import org.apache.giraph.bsp.BspInputSplit;
27  import org.apache.giraph.edge.Edge;
28  import org.apache.giraph.edge.EdgeFactory;
29  import org.apache.giraph.io.EdgeInputFormat;
30  import org.apache.giraph.io.EdgeReader;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.io.DoubleWritable;
33  import org.apache.hadoop.io.LongWritable;
34  import org.apache.hadoop.mapreduce.InputSplit;
35  import org.apache.hadoop.mapreduce.JobContext;
36  import org.apache.hadoop.mapreduce.TaskAttemptContext;
37  import org.apache.log4j.Logger;
38  
39  /**
40   * This {@link EdgeInputFormat} generates pseudo-random edges on the fly.
41   * As with {@link PseudoRandomVertexInputFormat}, the user specifies the
42   * number of vertices and the number of edges per vertex.
43   */
44  public class PseudoRandomEdgeInputFormat
45      extends EdgeInputFormat<LongWritable, DoubleWritable> {
46    @Override public void checkInputSpecs(Configuration conf) { }
47  
48    @Override
49    public final List<InputSplit> getSplits(final JobContext context,
50                                            final int minSplitCountHint)
51      throws IOException, InterruptedException {
52      return PseudoRandomUtils.getSplits(minSplitCountHint);
53    }
54  
55    @Override
56    public EdgeReader<LongWritable, DoubleWritable> createEdgeReader(
57        InputSplit split, TaskAttemptContext context) throws IOException {
58      return new PseudoRandomEdgeReader();
59    }
60  
61    /**
62     * {@link EdgeReader} that generates pseudo-random edges.
63     */
64    private static class PseudoRandomEdgeReader
65        extends EdgeReader<LongWritable, DoubleWritable>  {
66      /** Logger. */
67      private static final Logger LOG =
68          Logger.getLogger(PseudoRandomEdgeReader.class);
69      /** Starting vertex id. */
70      private long startingVertexId = -1;
71      /** Vertices read so far. */
72      private long verticesRead = 0;
73      /** Total vertices to read (on this split alone). */
74      private long totalSplitVertices = -1;
75      /** Current vertex id. */
76      private LongWritable currentVertexId = new LongWritable(-1);
77      /** Edges read for the current vertex. */
78      private int currentOutEdgesRead = 0;
79      /** Target vertices of edges for current vertex. */
80      private Set<LongWritable> currentVertexDestVertices = Sets.newHashSet();
81      /** Random number generator for the current vertex (for consistency
82       * across runs on different numbers of workers). */
83      private Random random = new Random();
84      /** Aggregate vertices (all input splits). */
85      private long aggregateVertices = -1;
86      /** Edges per vertex. */
87      private int edgesPerVertex = -1;
88      /** BspInputSplit (used only for index). */
89      private BspInputSplit bspInputSplit;
90      /** Helper for generating pseudo-random local edges. */
91      private PseudoRandomLocalEdgesHelper localEdgesHelper;
92  
93      @Override
94      public void initialize(InputSplit inputSplit, TaskAttemptContext context)
95        throws IOException, InterruptedException {
96        aggregateVertices = getConf().getLong(
97                PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 0);
98        if (aggregateVertices <= 0) {
99          throw new IllegalArgumentException(
100             PseudoRandomInputFormatConstants.AGGREGATE_VERTICES + " <= 0");
101       }
102       if (inputSplit instanceof BspInputSplit) {
103         bspInputSplit = (BspInputSplit) inputSplit;
104         long extraVertices =
105             aggregateVertices % bspInputSplit.getNumSplits();
106         totalSplitVertices =
107             aggregateVertices / bspInputSplit.getNumSplits();
108         if (bspInputSplit.getSplitIndex() < extraVertices) {
109           ++totalSplitVertices;
110         }
111         startingVertexId = (bspInputSplit.getSplitIndex() *
112             (aggregateVertices / bspInputSplit.getNumSplits())) +
113             Math.min(bspInputSplit.getSplitIndex(),
114                 extraVertices);
115       } else {
116         throw new IllegalArgumentException(
117             "initialize: Got " + inputSplit.getClass() +
118                 " instead of " + BspInputSplit.class);
119       }
120       edgesPerVertex = getConf().getInt(
121           PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 0);
122       if (edgesPerVertex <= 0) {
123         throw new IllegalArgumentException(
124             PseudoRandomInputFormatConstants.EDGES_PER_VERTEX + " <= 0");
125       }
126       float minLocalEdgesRatio = getConf().getFloat(
127           PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO,
128           PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO_DEFAULT);
129       localEdgesHelper = new PseudoRandomLocalEdgesHelper(aggregateVertices,
130           minLocalEdgesRatio, getConf());
131     }
132 
133     @Override
134     public boolean nextEdge() throws IOException, InterruptedException {
135       return totalSplitVertices > verticesRead + 1 ||
136           (totalSplitVertices == verticesRead + 1 &&
137               edgesPerVertex > currentOutEdgesRead);
138     }
139 
140     @Override
141     public LongWritable getCurrentSourceId() throws IOException,
142         InterruptedException {
143       if (currentOutEdgesRead == edgesPerVertex) {
144         ++verticesRead;
145         currentVertexId = new LongWritable(-1);
146       }
147 
148       if (currentVertexId.get() == -1) {
149         currentVertexId.set(startingVertexId + verticesRead);
150         currentOutEdgesRead = 0;
151         // Seed on the vertex id to keep the vertex data the same when
152         // on different number of workers, but other parameters are the
153         // same.
154         random.setSeed(currentVertexId.get());
155         currentVertexDestVertices.clear();
156       }
157       return currentVertexId;
158     }
159 
160     @Override
161     public Edge<LongWritable, DoubleWritable> getCurrentEdge()
162       throws IOException, InterruptedException {
163       LongWritable destVertexId = new LongWritable();
164       do {
165         destVertexId.set(localEdgesHelper.generateDestVertex(
166             currentVertexId.get(), random));
167       } while (currentVertexDestVertices.contains(destVertexId));
168       ++currentOutEdgesRead;
169       currentVertexDestVertices.add(destVertexId);
170       if (LOG.isTraceEnabled()) {
171         LOG.trace("getCurrentEdge: Return edge (" + currentVertexId + ", " +
172             "" + destVertexId + ")");
173       }
174       return EdgeFactory.create(
175           destVertexId,
176           new DoubleWritable(random.nextDouble()));
177     }
178 
179     @Override
180     public void close() throws IOException { }
181 
182     @Override
183     public float getProgress() throws IOException, InterruptedException {
184       return (verticesRead * edgesPerVertex + currentOutEdgesRead) *
185           100.0f / (totalSplitVertices * edgesPerVertex);
186     }
187   }
188 }