This project has retired. For details please refer to its
Attic page.
PseudoRandomEdgeInputFormat xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.io.formats;
20
21 import com.google.common.collect.Sets;
22 import java.io.IOException;
23 import java.util.List;
24 import java.util.Random;
25 import java.util.Set;
26 import org.apache.giraph.bsp.BspInputSplit;
27 import org.apache.giraph.edge.Edge;
28 import org.apache.giraph.edge.EdgeFactory;
29 import org.apache.giraph.io.EdgeInputFormat;
30 import org.apache.giraph.io.EdgeReader;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.io.DoubleWritable;
33 import org.apache.hadoop.io.LongWritable;
34 import org.apache.hadoop.mapreduce.InputSplit;
35 import org.apache.hadoop.mapreduce.JobContext;
36 import org.apache.hadoop.mapreduce.TaskAttemptContext;
37 import org.apache.log4j.Logger;
38
39
40
41
42
43
44 public class PseudoRandomEdgeInputFormat
45 extends EdgeInputFormat<LongWritable, DoubleWritable> {
46 @Override public void checkInputSpecs(Configuration conf) { }
47
48 @Override
49 public final List<InputSplit> getSplits(final JobContext context,
50 final int minSplitCountHint)
51 throws IOException, InterruptedException {
52 return PseudoRandomUtils.getSplits(minSplitCountHint);
53 }
54
55 @Override
56 public EdgeReader<LongWritable, DoubleWritable> createEdgeReader(
57 InputSplit split, TaskAttemptContext context) throws IOException {
58 return new PseudoRandomEdgeReader();
59 }
60
61
62
63
64 private static class PseudoRandomEdgeReader
65 extends EdgeReader<LongWritable, DoubleWritable> {
66
67 private static final Logger LOG =
68 Logger.getLogger(PseudoRandomEdgeReader.class);
69
70 private long startingVertexId = -1;
71
72 private long verticesRead = 0;
73
74 private long totalSplitVertices = -1;
75
76 private LongWritable currentVertexId = new LongWritable(-1);
77
78 private int currentOutEdgesRead = 0;
79
80 private Set<LongWritable> currentVertexDestVertices = Sets.newHashSet();
81
82
83 private Random random = new Random();
84
85 private long aggregateVertices = -1;
86
87 private int edgesPerVertex = -1;
88
89 private BspInputSplit bspInputSplit;
90
91 private PseudoRandomLocalEdgesHelper localEdgesHelper;
92
93 @Override
94 public void initialize(InputSplit inputSplit, TaskAttemptContext context)
95 throws IOException, InterruptedException {
96 aggregateVertices = getConf().getLong(
97 PseudoRandomInputFormatConstants.AGGREGATE_VERTICES, 0);
98 if (aggregateVertices <= 0) {
99 throw new IllegalArgumentException(
100 PseudoRandomInputFormatConstants.AGGREGATE_VERTICES + " <= 0");
101 }
102 if (inputSplit instanceof BspInputSplit) {
103 bspInputSplit = (BspInputSplit) inputSplit;
104 long extraVertices =
105 aggregateVertices % bspInputSplit.getNumSplits();
106 totalSplitVertices =
107 aggregateVertices / bspInputSplit.getNumSplits();
108 if (bspInputSplit.getSplitIndex() < extraVertices) {
109 ++totalSplitVertices;
110 }
111 startingVertexId = (bspInputSplit.getSplitIndex() *
112 (aggregateVertices / bspInputSplit.getNumSplits())) +
113 Math.min(bspInputSplit.getSplitIndex(),
114 extraVertices);
115 } else {
116 throw new IllegalArgumentException(
117 "initialize: Got " + inputSplit.getClass() +
118 " instead of " + BspInputSplit.class);
119 }
120 edgesPerVertex = getConf().getInt(
121 PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 0);
122 if (edgesPerVertex <= 0) {
123 throw new IllegalArgumentException(
124 PseudoRandomInputFormatConstants.EDGES_PER_VERTEX + " <= 0");
125 }
126 float minLocalEdgesRatio = getConf().getFloat(
127 PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO,
128 PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO_DEFAULT);
129 localEdgesHelper = new PseudoRandomLocalEdgesHelper(aggregateVertices,
130 minLocalEdgesRatio, getConf());
131 }
132
133 @Override
134 public boolean nextEdge() throws IOException, InterruptedException {
135 return totalSplitVertices > verticesRead + 1 ||
136 (totalSplitVertices == verticesRead + 1 &&
137 edgesPerVertex > currentOutEdgesRead);
138 }
139
140 @Override
141 public LongWritable getCurrentSourceId() throws IOException,
142 InterruptedException {
143 if (currentOutEdgesRead == edgesPerVertex) {
144 ++verticesRead;
145 currentVertexId = new LongWritable(-1);
146 }
147
148 if (currentVertexId.get() == -1) {
149 currentVertexId.set(startingVertexId + verticesRead);
150 currentOutEdgesRead = 0;
151
152
153
154 random.setSeed(currentVertexId.get());
155 currentVertexDestVertices.clear();
156 }
157 return currentVertexId;
158 }
159
160 @Override
161 public Edge<LongWritable, DoubleWritable> getCurrentEdge()
162 throws IOException, InterruptedException {
163 LongWritable destVertexId = new LongWritable();
164 do {
165 destVertexId.set(localEdgesHelper.generateDestVertex(
166 currentVertexId.get(), random));
167 } while (currentVertexDestVertices.contains(destVertexId));
168 ++currentOutEdgesRead;
169 currentVertexDestVertices.add(destVertexId);
170 if (LOG.isTraceEnabled()) {
171 LOG.trace("getCurrentEdge: Return edge (" + currentVertexId + ", " +
172 "" + destVertexId + ")");
173 }
174 return EdgeFactory.create(
175 destVertexId,
176 new DoubleWritable(random.nextDouble()));
177 }
178
179 @Override
180 public void close() throws IOException { }
181
182 @Override
183 public float getProgress() throws IOException, InterruptedException {
184 return (verticesRead * edgesPerVertex + currentOutEdgesRead) *
185 100.0f / (totalSplitVertices * edgesPerVertex);
186 }
187 }
188 }