This project has retired. For details please refer to its
Attic page.
PseudoRandomIntNullVertexInputFormat xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.formats;
20
21 import org.apache.giraph.bsp.BspInputSplit;
22 import org.apache.giraph.edge.Edge;
23 import org.apache.giraph.edge.OutEdges;
24 import org.apache.giraph.edge.ReusableEdge;
25 import org.apache.giraph.graph.Vertex;
26 import org.apache.giraph.io.VertexInputFormat;
27 import org.apache.giraph.io.VertexReader;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.io.FloatWritable;
30 import org.apache.hadoop.io.IntWritable;
31 import org.apache.hadoop.io.NullWritable;
32 import org.apache.hadoop.mapreduce.InputSplit;
33 import org.apache.hadoop.mapreduce.JobContext;
34 import org.apache.hadoop.mapreduce.TaskAttemptContext;
35
36 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
37 import it.unimi.dsi.fastutil.ints.IntSet;
38
39 import java.io.IOException;
40 import java.util.List;
41 import java.util.Random;
42
43
44
45
46
47
48 public class PseudoRandomIntNullVertexInputFormat extends
49 VertexInputFormat<IntWritable, FloatWritable, NullWritable> {
50 @Override public void checkInputSpecs(Configuration conf) { }
51
52 @Override
53 public final List<InputSplit> getSplits(final JobContext context,
54 final int minSplitCountHint) throws IOException, InterruptedException {
55 return PseudoRandomUtils.getSplits(minSplitCountHint);
56 }
57
58 @Override
59 public VertexReader<IntWritable, FloatWritable, NullWritable>
60 createVertexReader(InputSplit split,
61 TaskAttemptContext context) throws IOException {
62 return new PseudoRandomVertexReader();
63 }
64
65
66
67
68
69 private static class PseudoRandomVertexReader extends
70 VertexReader<IntWritable, FloatWritable, NullWritable> {
71
72 private int startingVertexId = -1;
73
74 private int verticesRead = 0;
75
76 private int totalSplitVertices = -1;
77
78 private int edgesPerVertex = -1;
79
80 private final IntSet destVertices = new IntOpenHashSet();
81
82 private ReusableEdge<IntWritable, NullWritable> reusableEdge = null;
83
84 private PseudoRandomIntNullLocalEdgesHelper localEdgesHelper;
85
86 private Random rand;
87
88
89 public PseudoRandomVertexReader() {
90 }
91
92 @Override
93 public void initialize(InputSplit inputSplit,
94 TaskAttemptContext context) throws IOException {
95 int aggregateVertices = getConf().getInt(
96 PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 0);
97 BspInputSplit bspInputSplit = (BspInputSplit) inputSplit;
98 int extraVertices = aggregateVertices % bspInputSplit.getNumSplits();
99 totalSplitVertices = aggregateVertices / bspInputSplit.getNumSplits();
100 if (bspInputSplit.getSplitIndex() < extraVertices) {
101 ++totalSplitVertices;
102 }
103 startingVertexId = bspInputSplit.getSplitIndex() *
104 (aggregateVertices / bspInputSplit.getNumSplits()) +
105 Math.min(bspInputSplit.getSplitIndex(), extraVertices);
106 edgesPerVertex = getConf().getInt(
107 PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 0);
108 rand = new Random(bspInputSplit.getSplitIndex());
109 if (getConf().reuseEdgeObjects()) {
110 reusableEdge = getConf().createReusableEdge();
111 }
112 localEdgesHelper = new PseudoRandomIntNullLocalEdgesHelper(
113 aggregateVertices, getConf());
114 }
115
116 @Override
117 public boolean nextVertex() throws IOException, InterruptedException {
118 return totalSplitVertices > verticesRead;
119 }
120
121 @Override
122 public Vertex<IntWritable, FloatWritable, NullWritable>
123 getCurrentVertex() throws IOException, InterruptedException {
124 Vertex<IntWritable, FloatWritable, NullWritable> vertex =
125 getConf().createVertex();
126 int vertexId = startingVertexId + verticesRead;
127 OutEdges<IntWritable, NullWritable> edges =
128 getConf().createOutEdges();
129 edges.initialize(edgesPerVertex);
130 destVertices.clear();
131 for (int i = 0; i < edgesPerVertex; ++i) {
132 int destVertexId;
133 do {
134 destVertexId = localEdgesHelper.generateDestVertex(vertexId, rand);
135 } while (!destVertices.add(destVertexId));
136 Edge<IntWritable, NullWritable> edge =
137 (reusableEdge == null) ? getConf().createEdge() : reusableEdge;
138 edge.getTargetVertexId().set(destVertexId);
139 edges.add(edge);
140 }
141 vertex.initialize(
142 new IntWritable(vertexId), new FloatWritable(1.0f), edges);
143 ++verticesRead;
144 return vertex;
145 }
146
147 @Override
148 public void close() throws IOException {
149 }
150
151 @Override
152 public float getProgress() throws IOException {
153 return verticesRead * 100.0f / totalSplitVertices;
154 }
155 }
156 }