This project has retired. For details please refer to its Attic page.
EdgeInputSplitsCallable xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.worker;
20  
21  import java.io.IOException;
22  
23  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24  import org.apache.giraph.edge.Edge;
25  import org.apache.giraph.graph.VertexEdgeCount;
26  import org.apache.giraph.io.EdgeInputFormat;
27  import org.apache.giraph.io.EdgeReader;
28  import org.apache.giraph.io.filters.EdgeInputFilter;
29  import org.apache.giraph.io.InputType;
30  import org.apache.giraph.ooc.OutOfCoreEngine;
31  import org.apache.giraph.utils.LoggerUtils;
32  import org.apache.giraph.utils.MemoryUtils;
33  import org.apache.hadoop.io.Writable;
34  import org.apache.hadoop.io.WritableComparable;
35  import org.apache.hadoop.mapreduce.InputSplit;
36  import org.apache.hadoop.mapreduce.Mapper;
37  import org.apache.log4j.Level;
38  import org.apache.log4j.Logger;
39  
40  import com.yammer.metrics.core.Counter;
41  import com.yammer.metrics.core.Meter;
42  
43  /**
44   * Load as many edge input splits as possible.
45   * Every thread will has its own instance of WorkerClientRequestProcessor
46   * to send requests.
47   *
48   * @param <I> Vertex id
49   * @param <V> Vertex value
50   * @param <E> Edge value
51   */
52  @SuppressWarnings("unchecked")
53  public class EdgeInputSplitsCallable<I extends WritableComparable,
54      V extends Writable, E extends Writable>
55      extends InputSplitsCallable<I, V, E> {
56    /** How often to update metrics and print info */
57    public static final int EDGES_UPDATE_PERIOD = 1000000;
58    /** How often to update filtered metrics */
59    public static final int EDGES_FILTERED_UPDATE_PERIOD = 10000;
60  
61    /** Class logger */
62    private static final Logger LOG = Logger.getLogger(
63        EdgeInputSplitsCallable.class);
64  
65    /** Aggregator handler */
66    private final WorkerThreadGlobalCommUsage globalCommUsage;
67    /** Bsp service worker (only use thread-safe methods) */
68    private final BspServiceWorker<I, V, E> bspServiceWorker;
69    /** Edge input format */
70    private final EdgeInputFormat<I, E> edgeInputFormat;
71    /** Input split max edges (-1 denotes all) */
72    private final long inputSplitMaxEdges;
73    /** Can embedInfo in vertexIds */
74    private final boolean canEmbedInIds;
75  
76    /** Filter to use */
77    private final EdgeInputFilter<I, E> edgeInputFilter;
78  
79    // Metrics
80    /** edges loaded meter across all readers */
81    private final Meter totalEdgesMeter;
82    /** edges filtered out by user */
83    private final Counter totalEdgesFiltered;
84  
85    /**
86     * Constructor.
87     *
88     * @param edgeInputFormat Edge input format
89     * @param context Context
90     * @param configuration Configuration
91     * @param bspServiceWorker service worker
92     * @param splitsHandler Handler for input splits
93     */
94    public EdgeInputSplitsCallable(
95        EdgeInputFormat<I, E> edgeInputFormat,
96        Mapper<?, ?, ?, ?>.Context context,
97        ImmutableClassesGiraphConfiguration<I, V, E> configuration,
98        BspServiceWorker<I, V, E> bspServiceWorker,
99        WorkerInputSplitsHandler splitsHandler)  {
100     super(context, configuration, bspServiceWorker, splitsHandler);
101     this.edgeInputFormat = edgeInputFormat;
102 
103     this.bspServiceWorker = bspServiceWorker;
104     inputSplitMaxEdges = configuration.getInputSplitMaxEdges();
105     // Initialize aggregator usage.
106     this.globalCommUsage = bspServiceWorker.getAggregatorHandler()
107       .newThreadAggregatorUsage();
108     edgeInputFilter = configuration.getEdgeInputFilter();
109     canEmbedInIds = bspServiceWorker
110         .getLocalData()
111         .getMappingStoreOps() != null &&
112         bspServiceWorker
113             .getLocalData()
114             .getMappingStoreOps()
115             .hasEmbedding();
116 
117     // Initialize Metrics
118     totalEdgesMeter = getTotalEdgesLoadedMeter();
119     totalEdgesFiltered = getTotalEdgesFilteredCounter();
120   }
121 
122   @Override
123   public EdgeInputFormat<I, E> getInputFormat() {
124     return edgeInputFormat;
125   }
126 
127   @Override
128   public InputType getInputType() {
129     return InputType.EDGE;
130   }
131 
132   /**
133    * Read edges from input split.  If testing, the user may request a
134    * maximum number of edges to be read from an input split.
135    *
136    * @param inputSplit Input split to process with edge reader
137    * @return Edges loaded from this input split
138    * @throws IOException
139    * @throws InterruptedException
140    */
141   @Override
142   protected VertexEdgeCount readInputSplit(
143       InputSplit inputSplit) throws IOException,
144       InterruptedException {
145     EdgeReader<I, E> edgeReader =
146         edgeInputFormat.createEdgeReader(inputSplit, context);
147     edgeReader.setConf(
148         (ImmutableClassesGiraphConfiguration<I, Writable, E>)
149             configuration);
150 
151     edgeReader.initialize(inputSplit, context);
152     // Set aggregator usage to edge reader
153     edgeReader.setWorkerGlobalCommUsage(globalCommUsage);
154 
155     long inputSplitEdgesLoaded = 0;
156     long inputSplitEdgesFiltered = 0;
157 
158     int count = 0;
159     OutOfCoreEngine oocEngine = bspServiceWorker.getServerData().getOocEngine();
160     while (edgeReader.nextEdge()) {
161       // If out-of-core mechanism is used, check whether this thread
162       // can stay active or it should temporarily suspend and stop
163       // processing and generating more data for the moment.
164       if (oocEngine != null &&
165           (++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) {
166         oocEngine.activeThreadCheckIn();
167       }
168       I sourceId = edgeReader.getCurrentSourceId();
169       Edge<I, E> readerEdge = edgeReader.getCurrentEdge();
170       if (sourceId == null) {
171         throw new IllegalArgumentException(
172             "readInputSplit: Edge reader returned an edge " +
173                 "without a source vertex id!  - " + readerEdge);
174       }
175       if (readerEdge.getTargetVertexId() == null) {
176         throw new IllegalArgumentException(
177             "readInputSplit: Edge reader returned an edge " +
178                 "without a target vertex id!  - " + readerEdge);
179       }
180       if (readerEdge.getValue() == null) {
181         throw new IllegalArgumentException(
182             "readInputSplit: Edge reader returned an edge " +
183                 "without a value!  - " + readerEdge);
184       }
185       if (canEmbedInIds) {
186         bspServiceWorker
187             .getLocalData()
188             .getMappingStoreOps()
189             .embedTargetInfo(sourceId);
190         bspServiceWorker
191             .getLocalData()
192             .getMappingStoreOps()
193             .embedTargetInfo(readerEdge.getTargetVertexId());
194       }
195 
196       ++inputSplitEdgesLoaded;
197 
198       if (edgeInputFilter.dropEdge(sourceId, readerEdge)) {
199         ++inputSplitEdgesFiltered;
200         if (inputSplitEdgesFiltered % EDGES_FILTERED_UPDATE_PERIOD == 0) {
201           totalEdgesFiltered.inc(inputSplitEdgesFiltered);
202           inputSplitEdgesFiltered = 0;
203         }
204         continue;
205       }
206 
207       workerClientRequestProcessor.sendEdgeRequest(sourceId, readerEdge);
208 
209       // Update status every EDGES_UPDATE_PERIOD edges
210       if (inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD == 0) {
211         totalEdgesMeter.mark(EDGES_UPDATE_PERIOD);
212         WorkerProgress.get().addEdgesLoaded(EDGES_UPDATE_PERIOD);
213         LoggerUtils.setStatusAndLog(context, LOG, Level.INFO,
214             "readEdgeInputSplit: Loaded " +
215                 totalEdgesMeter.count() + " edges at " +
216                 totalEdgesMeter.meanRate() + " edges/sec " +
217                 MemoryUtils.getRuntimeMemoryStats());
218       }
219 
220       // For sampling, or to limit outlier input splits, the number of
221       // records per input split can be limited
222       if (inputSplitMaxEdges > 0 &&
223           inputSplitEdgesLoaded >= inputSplitMaxEdges) {
224         if (LOG.isInfoEnabled()) {
225           LOG.info("readInputSplit: Leaving the input " +
226               "split early, reached maximum edges " +
227               inputSplitEdgesLoaded);
228         }
229         break;
230       }
231     }
232     edgeReader.close();
233 
234     totalEdgesFiltered.inc(inputSplitEdgesFiltered);
235     totalEdgesMeter.mark(inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD);
236 
237     WorkerProgress.get().addEdgesLoaded(
238         inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD);
239     WorkerProgress.get().incrementEdgeInputSplitsLoaded();
240 
241     return new VertexEdgeCount(0, inputSplitEdgesLoaded, 0);
242   }
243 }