1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.worker;
2021import java.io.IOException;
2223import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24import org.apache.giraph.edge.Edge;
25import org.apache.giraph.edge.OutEdges;
26import org.apache.giraph.graph.Vertex;
27import org.apache.giraph.graph.VertexEdgeCount;
28import org.apache.giraph.io.GiraphInputFormat;
29import org.apache.giraph.io.VertexInputFormat;
30import org.apache.giraph.io.VertexReader;
31import org.apache.giraph.io.filters.VertexInputFilter;
32import org.apache.giraph.mapping.translate.TranslateEdge;
33import org.apache.giraph.io.InputType;
34import org.apache.giraph.ooc.OutOfCoreEngine;
35import org.apache.giraph.partition.PartitionOwner;
36import org.apache.giraph.utils.LoggerUtils;
37import org.apache.giraph.utils.MemoryUtils;
38import org.apache.hadoop.io.Writable;
39import org.apache.hadoop.io.WritableComparable;
40import org.apache.hadoop.mapreduce.InputSplit;
41import org.apache.hadoop.mapreduce.Mapper;
42import org.apache.log4j.Level;
43import org.apache.log4j.Logger;
4445import com.yammer.metrics.core.Counter;
46import com.yammer.metrics.core.Meter;
4748/**49 * Load as many vertex input splits as possible.50 * Every thread will has its own instance of WorkerClientRequestProcessor51 * to send requests.52 *53 * @param <I> Vertex index value54 * @param <V> Vertex value55 * @param <E> Edge value56 */57 @SuppressWarnings("unchecked")
58publicclass VertexInputSplitsCallable<I extends WritableComparable,
59 V extends Writable, E extends Writable>
60extends InputSplitsCallable<I, V, E> {
61/** How often to update metrics and print info */62publicstaticfinalint VERTICES_UPDATE_PERIOD = 250000;
63/** How often to update filtered out metrics */64publicstaticfinalint VERTICES_FILTERED_UPDATE_PERIOD = 2500;
6566/** Class logger */67privatestaticfinal Logger LOG =
68 Logger.getLogger(VertexInputSplitsCallable.class);
69/**Vertex input format */70privatefinal VertexInputFormat<I, V, E> vertexInputFormat;
71/** Input split max vertices (-1 denotes all) */72privatefinallong inputSplitMaxVertices;
73/** Bsp service worker (only use thread-safe methods) */74privatefinal BspServiceWorker<I, V, E> bspServiceWorker;
75/** Filter to select which vertices to keep */76privatefinal VertexInputFilter<I, V, E> vertexInputFilter;
77/** Can embedInfo in vertexIds */78privatefinalboolean canEmbedInIds;
79/**80 * Whether the chosen {@link OutEdges} implementation allows for Edge81 * reuse.82 */83privatefinalboolean reuseEdgeObjects;
84/** Used to translate Edges during vertex input phase based on localData */85privatefinal TranslateEdge<I, E> translateEdge;
8687// Metrics88/** number of vertices loaded meter across all readers */89privatefinal Meter totalVerticesMeter;
90/** number of vertices filtered out */91privatefinal Counter totalVerticesFilteredCounter;
92/** number of edges loaded meter across all readers */93privatefinal Meter totalEdgesMeter;
9495/**96 * Constructor.97 *98 * @param vertexInputFormat Vertex input format99 * @param context Context100 * @param configuration Configuration101 * @param bspServiceWorker service worker102 * @param splitsHandler Handler for input splits103 */104publicVertexInputSplitsCallable(
105 VertexInputFormat<I, V, E> vertexInputFormat,
106 Mapper<?, ?, ?, ?>.Context context,
107 ImmutableClassesGiraphConfiguration<I, V, E> configuration,
108 BspServiceWorker<I, V, E> bspServiceWorker,
109WorkerInputSplitsHandler splitsHandler) {
110super(context, configuration, bspServiceWorker, splitsHandler);
111this.vertexInputFormat = vertexInputFormat;
112113 inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
114this.bspServiceWorker = bspServiceWorker;
115 vertexInputFilter = configuration.getVertexInputFilter();
116 reuseEdgeObjects = configuration.reuseEdgeObjects();
117 canEmbedInIds = bspServiceWorker
118 .getLocalData()
119 .getMappingStoreOps() != null &&
120 bspServiceWorker
121 .getLocalData()
122 .getMappingStoreOps()
123 .hasEmbedding();
124 translateEdge = bspServiceWorker.getTranslateEdge();
125126// Initialize Metrics127 totalVerticesMeter = getTotalVerticesLoadedMeter();
128 totalVerticesFilteredCounter = getTotalVerticesFilteredCounter();
129 totalEdgesMeter = getTotalEdgesLoadedMeter();
130 }
131132 @Override
133publicGiraphInputFormat getInputFormat() {
134return vertexInputFormat;
135 }
136137 @Override
138publicInputType getInputType() {
139return InputType.VERTEX;
140 }
141142/**143 * Read vertices from input split. If testing, the user may request a144 * maximum number of vertices to be read from an input split.145 *146 * @param inputSplit Input split to process with vertex reader147 * @return Vertices and edges loaded from this input split148 * @throws IOException149 * @throws InterruptedException150 */151 @Override
152protectedVertexEdgeCount readInputSplit(
153 InputSplit inputSplit) throws IOException, InterruptedException {
154 VertexReader<I, V, E> vertexReader =
155 vertexInputFormat.createVertexReader(inputSplit, context);
156 vertexReader.setConf(configuration);
157158WorkerThreadGlobalCommUsage globalCommUsage =
159this.bspServiceWorker
160 .getAggregatorHandler().newThreadAggregatorUsage();
161162 vertexReader.initialize(inputSplit, context);
163// Set aggregator usage to vertex reader164 vertexReader.setWorkerGlobalCommUsage(globalCommUsage);
165166long inputSplitVerticesLoaded = 0;
167long inputSplitVerticesFiltered = 0;
168169long edgesSinceLastUpdate = 0;
170long inputSplitEdgesLoaded = 0;
171172int count = 0;
173OutOfCoreEngine oocEngine = bspServiceWorker.getServerData().getOocEngine();
174while (vertexReader.nextVertex()) {
175// If out-of-core mechanism is used, check whether this thread176// can stay active or it should temporarily suspend and stop177// processing and generating more data for the moment.178if (oocEngine != null &&
179 (++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) {
180 oocEngine.activeThreadCheckIn();
181 }
182 Vertex<I, V, E> readerVertex = vertexReader.getCurrentVertex();
183if (readerVertex.getId() == null) {
184thrownew IllegalArgumentException(
185"readInputSplit: Vertex reader returned a vertex " +
186"without an id! - " + readerVertex);
187 }
188if (canEmbedInIds) {
189 bspServiceWorker
190 .getLocalData()
191 .getMappingStoreOps()
192 .embedTargetInfo(readerVertex.getId());
193 }
194if (readerVertex.getValue() == null) {
195 readerVertex.setValue(configuration.createVertexValue());
196 }
197 readerVertex.setConf(configuration);
198199 ++inputSplitVerticesLoaded;
200201if (vertexInputFilter.dropVertex(readerVertex)) {
202 ++inputSplitVerticesFiltered;
203if (inputSplitVerticesFiltered % VERTICES_FILTERED_UPDATE_PERIOD == 0) {
204 totalVerticesFilteredCounter.inc(inputSplitVerticesFiltered);
205 inputSplitVerticesFiltered = 0;
206 }
207continue;
208 }
209210// Before saving to partition-store translate all edges (if present)211if (translateEdge != null) {
212// only iff vertexInput reads edges also213if (readerVertex.getEdges() != null && readerVertex.getNumEdges() > 0) {
214 OutEdges<I, E> vertexOutEdges = configuration
215 .createAndInitializeOutEdges(readerVertex.getNumEdges());
216// TODO : this works for generic OutEdges, can create a better api217// to support more efficient translation for specific types218219// NOTE : for implementations where edge is reusable, space is220// consumed by the OutEdges data structure itself, but if not reusable221// space is consumed by the newly created edge -> and the new OutEdges222// data structure just holds a reference to the newly created edge223// so in any way we virtually hold edges twice - similar to224// OutEdges.trim() -> this has the same complexity as OutEdges.trim()225for (Edge<I, E> edge : readerVertex.getEdges()) {
226if (reuseEdgeObjects) {
227 bspServiceWorker
228 .getLocalData()
229 .getMappingStoreOps()
230 .embedTargetInfo(edge.getTargetVertexId());
231 vertexOutEdges.add(edge); // edge can be re-used232 } else { // edge objects cannot be reused - so create new edges233 vertexOutEdges.add(configuration.createEdge(translateEdge, edge));
234 }
235 }
236// set out edges to translated instance -> old instance is released237 readerVertex.setEdges(vertexOutEdges);
238 }
239 }
240241PartitionOwner partitionOwner =
242 bspServiceWorker.getVertexPartitionOwner(readerVertex.getId());
243 workerClientRequestProcessor.sendVertexRequest(
244 partitionOwner, readerVertex);
245 edgesSinceLastUpdate += readerVertex.getNumEdges();
246247// Update status every VERTICES_UPDATE_PERIOD vertices248if (inputSplitVerticesLoaded % VERTICES_UPDATE_PERIOD == 0) {
249 totalVerticesMeter.mark(VERTICES_UPDATE_PERIOD);
250 WorkerProgress.get().addVerticesLoaded(VERTICES_UPDATE_PERIOD);
251 totalEdgesMeter.mark(edgesSinceLastUpdate);
252 inputSplitEdgesLoaded += edgesSinceLastUpdate;
253 edgesSinceLastUpdate = 0;
254255 LoggerUtils.setStatusAndLog(
256 context, LOG, Level.INFO,
257"readVertexInputSplit: Loaded " +
258 totalVerticesMeter.count() + " vertices at " +
259 totalVerticesMeter.meanRate() + " vertices/sec " +
260 totalEdgesMeter.count() + " edges at " +
261 totalEdgesMeter.meanRate() + " edges/sec " +
262 MemoryUtils.getRuntimeMemoryStats());
263 }
264265// For sampling, or to limit outlier input splits, the number of266// records per input split can be limited267if (inputSplitMaxVertices > 0 &&
268 inputSplitVerticesLoaded >= inputSplitMaxVertices) {
269if (LOG.isInfoEnabled()) {
270 LOG.info("readInputSplit: Leaving the input " +
271"split early, reached maximum vertices " +
272 inputSplitVerticesLoaded);
273 }
274break;
275 }
276 }
277278 totalVerticesMeter.mark(inputSplitVerticesLoaded % VERTICES_UPDATE_PERIOD);
279 totalEdgesMeter.mark(edgesSinceLastUpdate);
280 totalVerticesFilteredCounter.inc(inputSplitVerticesFiltered);
281282 vertexReader.close();
283284 WorkerProgress.get().addVerticesLoaded(
285 inputSplitVerticesLoaded % VERTICES_UPDATE_PERIOD);
286 WorkerProgress.get().incrementVertexInputSplitsLoaded();
287288returnnewVertexEdgeCount(inputSplitVerticesLoaded,
289 inputSplitEdgesLoaded + edgesSinceLastUpdate, 0);
290 }
291 }
292