This project has retired. For details please refer to its
Attic page.
EdgeInputSplitsCallable xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.worker;
20
21 import java.io.IOException;
22
23 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24 import org.apache.giraph.edge.Edge;
25 import org.apache.giraph.graph.VertexEdgeCount;
26 import org.apache.giraph.io.EdgeInputFormat;
27 import org.apache.giraph.io.EdgeReader;
28 import org.apache.giraph.io.filters.EdgeInputFilter;
29 import org.apache.giraph.io.InputType;
30 import org.apache.giraph.ooc.OutOfCoreEngine;
31 import org.apache.giraph.utils.LoggerUtils;
32 import org.apache.giraph.utils.MemoryUtils;
33 import org.apache.hadoop.io.Writable;
34 import org.apache.hadoop.io.WritableComparable;
35 import org.apache.hadoop.mapreduce.InputSplit;
36 import org.apache.hadoop.mapreduce.Mapper;
37 import org.apache.log4j.Level;
38 import org.apache.log4j.Logger;
39
40 import com.yammer.metrics.core.Counter;
41 import com.yammer.metrics.core.Meter;
42
43
44
45
46
47
48
49
50
51
52 @SuppressWarnings("unchecked")
53 public class EdgeInputSplitsCallable<I extends WritableComparable,
54 V extends Writable, E extends Writable>
55 extends InputSplitsCallable<I, V, E> {
56
57 public static final int EDGES_UPDATE_PERIOD = 1000000;
58
59 public static final int EDGES_FILTERED_UPDATE_PERIOD = 10000;
60
61
62 private static final Logger LOG = Logger.getLogger(
63 EdgeInputSplitsCallable.class);
64
65
66 private final WorkerThreadGlobalCommUsage globalCommUsage;
67
68 private final BspServiceWorker<I, V, E> bspServiceWorker;
69
70 private final EdgeInputFormat<I, E> edgeInputFormat;
71
72 private final long inputSplitMaxEdges;
73
74 private final boolean canEmbedInIds;
75
76
77 private final EdgeInputFilter<I, E> edgeInputFilter;
78
79
80
81 private final Meter totalEdgesMeter;
82
83 private final Counter totalEdgesFiltered;
84
85
86
87
88
89
90
91
92
93
94 public EdgeInputSplitsCallable(
95 EdgeInputFormat<I, E> edgeInputFormat,
96 Mapper<?, ?, ?, ?>.Context context,
97 ImmutableClassesGiraphConfiguration<I, V, E> configuration,
98 BspServiceWorker<I, V, E> bspServiceWorker,
99 WorkerInputSplitsHandler splitsHandler) {
100 super(context, configuration, bspServiceWorker, splitsHandler);
101 this.edgeInputFormat = edgeInputFormat;
102
103 this.bspServiceWorker = bspServiceWorker;
104 inputSplitMaxEdges = configuration.getInputSplitMaxEdges();
105
106 this.globalCommUsage = bspServiceWorker.getAggregatorHandler()
107 .newThreadAggregatorUsage();
108 edgeInputFilter = configuration.getEdgeInputFilter();
109 canEmbedInIds = bspServiceWorker
110 .getLocalData()
111 .getMappingStoreOps() != null &&
112 bspServiceWorker
113 .getLocalData()
114 .getMappingStoreOps()
115 .hasEmbedding();
116
117
118 totalEdgesMeter = getTotalEdgesLoadedMeter();
119 totalEdgesFiltered = getTotalEdgesFilteredCounter();
120 }
121
122 @Override
123 public EdgeInputFormat<I, E> getInputFormat() {
124 return edgeInputFormat;
125 }
126
127 @Override
128 public InputType getInputType() {
129 return InputType.EDGE;
130 }
131
132
133
134
135
136
137
138
139
140
141 @Override
142 protected VertexEdgeCount readInputSplit(
143 InputSplit inputSplit) throws IOException,
144 InterruptedException {
145 EdgeReader<I, E> edgeReader =
146 edgeInputFormat.createEdgeReader(inputSplit, context);
147 edgeReader.setConf(
148 (ImmutableClassesGiraphConfiguration<I, Writable, E>)
149 configuration);
150
151 edgeReader.initialize(inputSplit, context);
152
153 edgeReader.setWorkerGlobalCommUsage(globalCommUsage);
154
155 long inputSplitEdgesLoaded = 0;
156 long inputSplitEdgesFiltered = 0;
157
158 int count = 0;
159 OutOfCoreEngine oocEngine = bspServiceWorker.getServerData().getOocEngine();
160 while (edgeReader.nextEdge()) {
161
162
163
164 if (oocEngine != null &&
165 (++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) {
166 oocEngine.activeThreadCheckIn();
167 }
168 I sourceId = edgeReader.getCurrentSourceId();
169 Edge<I, E> readerEdge = edgeReader.getCurrentEdge();
170 if (sourceId == null) {
171 throw new IllegalArgumentException(
172 "readInputSplit: Edge reader returned an edge " +
173 "without a source vertex id! - " + readerEdge);
174 }
175 if (readerEdge.getTargetVertexId() == null) {
176 throw new IllegalArgumentException(
177 "readInputSplit: Edge reader returned an edge " +
178 "without a target vertex id! - " + readerEdge);
179 }
180 if (readerEdge.getValue() == null) {
181 throw new IllegalArgumentException(
182 "readInputSplit: Edge reader returned an edge " +
183 "without a value! - " + readerEdge);
184 }
185 if (canEmbedInIds) {
186 bspServiceWorker
187 .getLocalData()
188 .getMappingStoreOps()
189 .embedTargetInfo(sourceId);
190 bspServiceWorker
191 .getLocalData()
192 .getMappingStoreOps()
193 .embedTargetInfo(readerEdge.getTargetVertexId());
194 }
195
196 ++inputSplitEdgesLoaded;
197
198 if (edgeInputFilter.dropEdge(sourceId, readerEdge)) {
199 ++inputSplitEdgesFiltered;
200 if (inputSplitEdgesFiltered % EDGES_FILTERED_UPDATE_PERIOD == 0) {
201 totalEdgesFiltered.inc(inputSplitEdgesFiltered);
202 inputSplitEdgesFiltered = 0;
203 }
204 continue;
205 }
206
207 workerClientRequestProcessor.sendEdgeRequest(sourceId, readerEdge);
208
209
210 if (inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD == 0) {
211 totalEdgesMeter.mark(EDGES_UPDATE_PERIOD);
212 WorkerProgress.get().addEdgesLoaded(EDGES_UPDATE_PERIOD);
213 LoggerUtils.setStatusAndLog(context, LOG, Level.INFO,
214 "readEdgeInputSplit: Loaded " +
215 totalEdgesMeter.count() + " edges at " +
216 totalEdgesMeter.meanRate() + " edges/sec " +
217 MemoryUtils.getRuntimeMemoryStats());
218 }
219
220
221
222 if (inputSplitMaxEdges > 0 &&
223 inputSplitEdgesLoaded >= inputSplitMaxEdges) {
224 if (LOG.isInfoEnabled()) {
225 LOG.info("readInputSplit: Leaving the input " +
226 "split early, reached maximum edges " +
227 inputSplitEdgesLoaded);
228 }
229 break;
230 }
231 }
232 edgeReader.close();
233
234 totalEdgesFiltered.inc(inputSplitEdgesFiltered);
235 totalEdgesMeter.mark(inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD);
236
237 WorkerProgress.get().addEdgesLoaded(
238 inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD);
239 WorkerProgress.get().incrementEdgeInputSplitsLoaded();
240
241 return new VertexEdgeCount(0, inputSplitEdgesLoaded, 0);
242 }
243 }