This project has retired. For details please refer to its
Attic page.
PseudoRandomVertexInputFormat xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.formats;
20
21 import org.apache.giraph.bsp.BspInputSplit;
22 import org.apache.giraph.edge.EdgeFactory;
23 import org.apache.giraph.edge.OutEdges;
24 import org.apache.giraph.graph.Vertex;
25 import org.apache.giraph.io.VertexInputFormat;
26 import org.apache.giraph.io.VertexReader;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.io.DoubleWritable;
29 import org.apache.hadoop.io.LongWritable;
30 import org.apache.hadoop.mapreduce.InputSplit;
31 import org.apache.hadoop.mapreduce.JobContext;
32 import org.apache.hadoop.mapreduce.TaskAttemptContext;
33 import org.apache.log4j.Logger;
34
35 import com.google.common.collect.Sets;
36
37 import java.io.IOException;
38 import java.util.List;
39 import java.util.Random;
40 import java.util.Set;
41
42
43
44
45
46
47
48 public class PseudoRandomVertexInputFormat extends
49 VertexInputFormat<LongWritable, DoubleWritable, DoubleWritable> {
50 @Override public void checkInputSpecs(Configuration conf) { }
51
52 @Override
53 public final List<InputSplit> getSplits(final JobContext context,
54 final int minSplitCountHint) throws IOException, InterruptedException {
55 return PseudoRandomUtils.getSplits(minSplitCountHint);
56 }
57
58 @Override
59 public VertexReader<LongWritable, DoubleWritable, DoubleWritable>
60 createVertexReader(InputSplit split, TaskAttemptContext context)
61 throws IOException {
62 return new PseudoRandomVertexReader();
63 }
64
65
66
67
68
69 private static class PseudoRandomVertexReader extends
70 VertexReader<LongWritable, DoubleWritable, DoubleWritable> {
71
72 private static final Logger LOG =
73 Logger.getLogger(PseudoRandomVertexReader.class);
74
75 private long startingVertexId = -1;
76
77 private long verticesRead = 0;
78
79 private long totalSplitVertices = -1;
80
81 private long aggregateVertices = -1;
82
83 private int edgesPerVertex = -1;
84
85 private BspInputSplit bspInputSplit;
86
87 private PseudoRandomLocalEdgesHelper localEdgesHelper;
88
89
90
91
92 public PseudoRandomVertexReader() {
93 }
94
95 @Override
96 public void initialize(InputSplit inputSplit,
97 TaskAttemptContext context) throws IOException {
98 aggregateVertices = getConf().getLong(
99 PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 0);
100 if (aggregateVertices <= 0) {
101 throw new IllegalArgumentException(
102 PseudoRandomInputFormatConstants.AGGREGATE_VERTICES + " <= 0");
103 }
104 if (inputSplit instanceof BspInputSplit) {
105 bspInputSplit = (BspInputSplit) inputSplit;
106 long extraVertices =
107 aggregateVertices % bspInputSplit.getNumSplits();
108 totalSplitVertices =
109 aggregateVertices / bspInputSplit.getNumSplits();
110 if (bspInputSplit.getSplitIndex() < extraVertices) {
111 ++totalSplitVertices;
112 }
113 startingVertexId = (bspInputSplit.getSplitIndex() *
114 (aggregateVertices / bspInputSplit.getNumSplits())) +
115 Math.min(bspInputSplit.getSplitIndex(),
116 extraVertices);
117 } else {
118 throw new IllegalArgumentException(
119 "initialize: Got " + inputSplit.getClass() +
120 " instead of " + BspInputSplit.class);
121 }
122 edgesPerVertex = getConf().getInt(
123 PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 0);
124 if (edgesPerVertex <= 0) {
125 throw new IllegalArgumentException(
126 PseudoRandomInputFormatConstants.EDGES_PER_VERTEX + " <= 0");
127 }
128 float minLocalEdgesRatio = getConf().getFloat(
129 PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO,
130 PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO_DEFAULT);
131 localEdgesHelper = new PseudoRandomLocalEdgesHelper(aggregateVertices,
132 minLocalEdgesRatio, getConf());
133 }
134
135 @Override
136 public boolean nextVertex() throws IOException, InterruptedException {
137 return totalSplitVertices > verticesRead;
138 }
139
140 @Override
141 public Vertex<LongWritable, DoubleWritable, DoubleWritable>
142 getCurrentVertex() throws IOException, InterruptedException {
143 Vertex<LongWritable, DoubleWritable, DoubleWritable>
144 vertex = getConf().createVertex();
145 long vertexId = startingVertexId + verticesRead;
146
147
148
149 Random rand = new Random(vertexId);
150 DoubleWritable vertexValue = new DoubleWritable(rand.nextDouble());
151
152
153 OutEdges<LongWritable, DoubleWritable> edges =
154 getConf().createAndInitializeOutEdges(edgesPerVertex);
155 Set<LongWritable> destVertices = Sets.newHashSet();
156 for (long i = 0; i < edgesPerVertex; ++i) {
157 LongWritable destVertexId = new LongWritable();
158 do {
159 destVertexId.set(
160 localEdgesHelper.generateDestVertex(vertexId, rand));
161 } while (destVertices.contains(destVertexId));
162 edges.add(EdgeFactory.create(destVertexId,
163 new DoubleWritable(rand.nextDouble())));
164 destVertices.add(destVertexId);
165 }
166 vertex.initialize(new LongWritable(vertexId), vertexValue, edges);
167 ++verticesRead;
168 if (LOG.isTraceEnabled()) {
169 LOG.trace("next: Return vertexId=" +
170 vertex.getId().get() +
171 ", vertexValue=" + vertex.getValue() +
172 ", edges=" + vertex.getEdges());
173 }
174 return vertex;
175 }
176
177 @Override
178 public void close() throws IOException { }
179
180 @Override
181 public float getProgress() throws IOException {
182 return verticesRead * 100.0f / totalSplitVertices;
183 }
184 }
185 }