This project has retired. For details please refer to its Attic page.
InputSplitsCallable xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.worker;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.DataInputStream;
23  import java.io.IOException;
24  import java.util.concurrent.Callable;
25  
26  import org.apache.giraph.bsp.CentralizedServiceWorker;
27  import org.apache.giraph.comm.WorkerClientRequestProcessor;
28  import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
29  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
30  import org.apache.giraph.graph.VertexEdgeCount;
31  import org.apache.giraph.io.GiraphInputFormat;
32  import org.apache.giraph.io.InputType;
33  import org.apache.giraph.metrics.GiraphMetrics;
34  import org.apache.giraph.metrics.GiraphMetricsRegistry;
35  import org.apache.giraph.metrics.MeterDesc;
36  import org.apache.giraph.metrics.MetricNames;
37  import org.apache.giraph.ooc.OutOfCoreEngine;
38  import org.apache.giraph.time.SystemTime;
39  import org.apache.giraph.time.Time;
40  import org.apache.giraph.time.Times;
41  import org.apache.hadoop.io.Writable;
42  import org.apache.hadoop.io.WritableComparable;
43  import org.apache.hadoop.mapreduce.InputSplit;
44  import org.apache.hadoop.mapreduce.Mapper;
45  import org.apache.log4j.Logger;
46  
47  import com.yammer.metrics.core.Counter;
48  import com.yammer.metrics.core.Meter;
49  import com.yammer.metrics.util.PercentGauge;
50  
51  /**
52   * Abstract base class for loading vertex/edge input splits.
53   * Every thread will has its own instance of WorkerClientRequestProcessor
54   * to send requests.
55   *
56   * @param <I> Vertex index value
57   * @param <V> Vertex value
58   * @param <E> Edge value
59   */
60  public abstract class InputSplitsCallable<I extends WritableComparable,
61      V extends Writable, E extends Writable>
62      implements Callable<VertexEdgeCount> {
63    /** Class logger */
64    private static final Logger LOG = Logger.getLogger(InputSplitsCallable.class);
65    /** Class time object */
66    private static final Time TIME = SystemTime.get();
67    /** Configuration */
68    protected final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
69    /** Context */
70    protected final Mapper<?, ?, ?, ?>.Context context;
71    /** Handles IPC communication */
72    protected final WorkerClientRequestProcessor<I, V, E>
73    workerClientRequestProcessor;
74    /**
75     * Stores and processes the list of InputSplits advertised
76     * in a tree of child znodes by the master.
77     */
78    private final WorkerInputSplitsHandler splitsHandler;
79    /** Get the start time in nanos */
80    private final long startNanos = TIME.getNanoseconds();
81    /** Whether to prioritize local input splits. */
82    private final boolean useLocality;
83    /** Service worker */
84    private final CentralizedServiceWorker<I, V, E> serviceWorker;
85  
86    /**
87     * Constructor.
88     *
89     * @param context Context
90     * @param configuration Configuration
91     * @param bspServiceWorker service worker
92     * @param splitsHandler Handler for input splits
93     */
94    public InputSplitsCallable(
95        Mapper<?, ?, ?, ?>.Context context,
96        ImmutableClassesGiraphConfiguration<I, V, E> configuration,
97        BspServiceWorker<I, V, E> bspServiceWorker,
98        WorkerInputSplitsHandler splitsHandler) {
99      this.context = context;
100     this.workerClientRequestProcessor =
101         new NettyWorkerClientRequestProcessor<I, V, E>(
102             context, configuration, bspServiceWorker,
103             false /* useOneMessageToManyIdsEncoding, not useful for input */);
104     this.useLocality = configuration.useInputSplitLocality();
105     this.splitsHandler = splitsHandler;
106     this.configuration = configuration;
107     this.serviceWorker = bspServiceWorker;
108   }
109 
110   /**
111    * Get input format
112    *
113    * @return Input format
114    */
115   public abstract GiraphInputFormat getInputFormat();
116 
117   /**
118    * Get input type
119    *
120    * @return Input type
121    */
122   public abstract InputType getInputType();
123 
124   /**
125    * Get Meter tracking edges loaded
126    *
127    * @return Meter tracking edges loaded
128    */
129   public static Meter getTotalEdgesLoadedMeter() {
130     return GiraphMetrics.get().perJobRequired()
131         .getMeter(MeterDesc.EDGES_LOADED);
132   }
133 
134   /**
135    * Get Counter tracking edges filtered
136    *
137    * @return Counter tracking edges filtered
138    */
139   public static Counter getTotalEdgesFilteredCounter() {
140     return GiraphMetrics.get().perJobRequired()
141         .getCounter(MetricNames.EDGES_FILTERED);
142   }
143 
144   /**
145    * Get Meter tracking number of vertices loaded.
146    *
147    * @return Meter for vertices loaded
148    */
149   public static Meter getTotalVerticesLoadedMeter() {
150     return GiraphMetrics.get().perJobRequired()
151         .getMeter(MeterDesc.VERTICES_LOADED);
152   }
153 
154   /**
155    * Get Counter tracking vertices filtered
156    *
157    * @return Counter tracking vertices filtered
158    */
159   public static Counter getTotalVerticesFilteredCounter() {
160     return GiraphMetrics.get().perJobRequired()
161         .getCounter(MetricNames.VERTICES_FILTERED);
162   }
163 
164   /**
165    * Initialize metrics used by this class and its subclasses.
166    */
167   public static void initMetrics() {
168     GiraphMetricsRegistry metrics = GiraphMetrics.get().perJobRequired();
169 
170     final Counter edgesFiltered = getTotalEdgesFilteredCounter();
171     final Meter edgesLoaded = getTotalEdgesLoadedMeter();
172 
173     metrics.getGauge(MetricNames.EDGES_FILTERED_PCT, new PercentGauge() {
174       @Override protected double getNumerator() {
175         return edgesFiltered.count();
176       }
177 
178       @Override protected double getDenominator() {
179         return edgesLoaded.count();
180       }
181     });
182 
183     final Counter verticesFiltered = getTotalVerticesFilteredCounter();
184     final Meter verticesLoaded = getTotalVerticesLoadedMeter();
185 
186     metrics.getGauge(MetricNames.VERTICES_FILTERED_PCT, new PercentGauge() {
187       @Override protected double getNumerator() {
188         return verticesFiltered.count();
189       }
190 
191       @Override protected double getDenominator() {
192         return verticesLoaded.count();
193       }
194     });
195   }
196 
197   /**
198    * Load vertices/edges from the given input split.
199    *
200    * @param inputSplit Input split to load
201    * @return Count of vertices and edges loaded
202    * @throws IOException
203    * @throws InterruptedException
204    */
205   protected abstract VertexEdgeCount readInputSplit(InputSplit inputSplit)
206     throws IOException, InterruptedException;
207 
208   @Override
209   public VertexEdgeCount call() {
210     VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
211     int inputSplitsProcessed = 0;
212     try {
213       OutOfCoreEngine oocEngine = serviceWorker.getServerData().getOocEngine();
214       if (oocEngine != null) {
215         oocEngine.processingThreadStart();
216       }
217       while (true) {
218         byte[] serializedInputSplit = splitsHandler.reserveInputSplit(
219             getInputType(), inputSplitsProcessed == 0);
220         if (serializedInputSplit == null) {
221           // No splits left
222           break;
223         }
224         // If out-of-core mechanism is used, check whether this thread
225         // can stay active or it should temporarily suspend and stop
226         // processing and generating more data for the moment.
227         if (oocEngine != null) {
228           oocEngine.activeThreadCheckIn();
229         }
230         vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
231             loadInputSplit(serializedInputSplit));
232         context.progress();
233         ++inputSplitsProcessed;
234       }
235       if (oocEngine != null) {
236         oocEngine.processingThreadFinish();
237       }
238     } catch (InterruptedException e) {
239       throw new IllegalStateException("call: InterruptedException", e);
240     } catch (IOException e) {
241       throw new IllegalStateException("call: IOException", e);
242     } catch (ClassNotFoundException e) {
243       throw new IllegalStateException("call: ClassNotFoundException", e);
244     }
245 
246     if (LOG.isInfoEnabled()) {
247       float seconds = Times.getNanosSince(TIME, startNanos) /
248           Time.NS_PER_SECOND_AS_FLOAT;
249       float verticesPerSecond = vertexEdgeCount.getVertexCount() / seconds;
250       float edgesPerSecond = vertexEdgeCount.getEdgeCount() / seconds;
251       LOG.info("call: Loaded " + inputSplitsProcessed + " " +
252           "input splits in " + seconds + " secs, " + vertexEdgeCount +
253           " " + verticesPerSecond + " vertices/sec, " +
254           edgesPerSecond + " edges/sec");
255     }
256     try {
257       workerClientRequestProcessor.flush();
258     } catch (IOException e) {
259       throw new IllegalStateException("call: Flushing failed.", e);
260     }
261     return vertexEdgeCount;
262   }
263 
264   /**
265    * Extract vertices from input split, saving them into a mini cache of
266    * partitions.  Periodically flush the cache of vertices when a limit is
267    * reached in readVerticeFromInputSplit.
268    * Mark the input split finished when done.
269    *
270    * @param serializedInputSplit Serialized input split
271    * @return Mapping of vertex indices and statistics, or null if no data read
272    * @throws IOException
273    * @throws ClassNotFoundException
274    * @throws InterruptedException
275    */
276   private VertexEdgeCount loadInputSplit(byte[] serializedInputSplit)
277       throws IOException, ClassNotFoundException, InterruptedException {
278     InputSplit inputSplit = getInputSplit(serializedInputSplit);
279     VertexEdgeCount vertexEdgeCount = readInputSplit(inputSplit);
280     if (LOG.isInfoEnabled()) {
281       LOG.info("loadFromInputSplit: Finished loading " + vertexEdgeCount);
282     }
283     return vertexEdgeCount;
284   }
285 
286   /**
287    * Talk to ZooKeeper to convert the input split path to the actual
288    * InputSplit.
289    *
290    * @param serializedInputSplit Serialized input split
291    * @return instance of InputSplit
292    * @throws IOException
293    * @throws ClassNotFoundException
294    */
295   protected InputSplit getInputSplit(byte[] serializedInputSplit)
296       throws IOException, ClassNotFoundException {
297     DataInputStream inputStream =
298         new DataInputStream(new ByteArrayInputStream(serializedInputSplit));
299     InputSplit inputSplit = getInputFormat().readInputSplit(inputStream);
300 
301     if (LOG.isInfoEnabled()) {
302       LOG.info("getInputSplit: Reserved input split '" +
303           inputSplit.toString() + "'");
304     }
305     return inputSplit;
306   }
307 }