View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.worker;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.DataInputStream;
23  import java.io.IOException;
24  import java.util.concurrent.Callable;
25  
26  import org.apache.giraph.bsp.CentralizedServiceWorker;
27  import org.apache.giraph.comm.WorkerClientRequestProcessor;
28  import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
29  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
30  import org.apache.giraph.graph.VertexEdgeCount;
31  import org.apache.giraph.io.GiraphInputFormat;
32  import org.apache.giraph.io.InputType;
33  import org.apache.giraph.metrics.GiraphMetrics;
34  import org.apache.giraph.metrics.GiraphMetricsRegistry;
35  import org.apache.giraph.metrics.MeterDesc;
36  import org.apache.giraph.metrics.MetricNames;
37  import org.apache.giraph.ooc.OutOfCoreEngine;
38  import org.apache.giraph.time.SystemTime;
39  import org.apache.giraph.time.Time;
40  import org.apache.giraph.time.Times;
41  import org.apache.hadoop.io.Writable;
42  import org.apache.hadoop.io.WritableComparable;
43  import org.apache.hadoop.mapreduce.InputSplit;
44  import org.apache.hadoop.mapreduce.Mapper;
45  import org.apache.log4j.Logger;
46  
47  import com.yammer.metrics.core.Counter;
48  import com.yammer.metrics.core.Meter;
49  import com.yammer.metrics.util.PercentGauge;
50  
51  /**
52   * Abstract base class for loading vertex/edge input splits.
53   * Every thread will has its own instance of WorkerClientRequestProcessor
54   * to send requests.
55   *
56   * @param <I> Vertex index value
57   * @param <V> Vertex value
58   * @param <E> Edge value
59   */
60  public abstract class InputSplitsCallable<I extends WritableComparable,
61      V extends Writable, E extends Writable>
62      implements Callable<VertexEdgeCount> {
63    /** Class logger */
64    private static final Logger LOG = Logger.getLogger(InputSplitsCallable.class);
65    /** Class time object */
66    private static final Time TIME = SystemTime.get();
67    /** Configuration */
68    protected final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
69    /** Context */
70    protected final Mapper<?, ?, ?, ?>.Context context;
71    /** Handles IPC communication */
72    protected final WorkerClientRequestProcessor<I, V, E>
73    workerClientRequestProcessor;
74    /**
75     * Stores and processes the list of InputSplits advertised
76     * in a tree of child znodes by the master.
77     */
78    private final WorkerInputSplitsHandler splitsHandler;
79    /** Get the start time in nanos */
80    private final long startNanos = TIME.getNanoseconds();
81    /** Whether to prioritize local input splits. */
82    private final boolean useLocality;
83    /** Service worker */
84    private final CentralizedServiceWorker<I, V, E> serviceWorker;
85  
86    /**
87     * Constructor.
88     *
89     * @param context Context
90     * @param configuration Configuration
91     * @param bspServiceWorker service worker
92     * @param splitsHandler Handler for input splits
93     */
94    public InputSplitsCallable(
95        Mapper<?, ?, ?, ?>.Context context,
96        ImmutableClassesGiraphConfiguration<I, V, E> configuration,
97        BspServiceWorker<I, V, E> bspServiceWorker,
98        WorkerInputSplitsHandler splitsHandler) {
99      this.context = context;
100     this.workerClientRequestProcessor =
101         new NettyWorkerClientRequestProcessor<I, V, E>(
102             context, configuration, bspServiceWorker,
103             false /* useOneMessageToManyIdsEncoding, not useful for input */);
104     this.useLocality = configuration.useInputSplitLocality();
105     this.splitsHandler = splitsHandler;
106     this.configuration = configuration;
107     this.serviceWorker = bspServiceWorker;
108   }
109 
110   /**
111    * Get input format
112    *
113    * @return Input format
114    */
115   public abstract GiraphInputFormat getInputFormat();
116 
117   /**
118    * Get input type
119    *
120    * @return Input type
121    */
122   public abstract InputType getInputType();
123 
124   /**
125    * Get Meter tracking edges loaded
126    *
127    * @return Meter tracking edges loaded
128    */
129   public static Meter getTotalEdgesLoadedMeter() {
130     return GiraphMetrics.get().perJobRequired()
131         .getMeter(MeterDesc.EDGES_LOADED);
132   }
133 
134   /**
135    * Get Counter tracking edges filtered
136    *
137    * @return Counter tracking edges filtered
138    */
139   public static Counter getTotalEdgesFilteredCounter() {
140     return GiraphMetrics.get().perJobRequired()
141         .getCounter(MetricNames.EDGES_FILTERED);
142   }
143 
144   /**
145    * Get Meter tracking number of vertices loaded.
146    *
147    * @return Meter for vertices loaded
148    */
149   public static Meter getTotalVerticesLoadedMeter() {
150     return GiraphMetrics.get().perJobRequired()
151         .getMeter(MeterDesc.VERTICES_LOADED);
152   }
153 
154   /**
155    * Get Counter tracking vertices filtered
156    *
157    * @return Counter tracking vertices filtered
158    */
159   public static Counter getTotalVerticesFilteredCounter() {
160     return GiraphMetrics.get().perJobRequired()
161         .getCounter(MetricNames.VERTICES_FILTERED);
162   }
163 
164   /**
165    * Initialize metrics used by this class and its subclasses.
166    */
167   public static void initMetrics() {
168     GiraphMetricsRegistry metrics = GiraphMetrics.get().perJobRequired();
169 
170     final Counter edgesFiltered = getTotalEdgesFilteredCounter();
171     final Meter edgesLoaded = getTotalEdgesLoadedMeter();
172 
173     metrics.getGauge(MetricNames.EDGES_FILTERED_PCT, new PercentGauge() {
174       @Override protected double getNumerator() {
175         return edgesFiltered.count();
176       }
177 
178       @Override protected double getDenominator() {
179         return edgesLoaded.count();
180       }
181     });
182 
183     final Counter verticesFiltered = getTotalVerticesFilteredCounter();
184     final Meter verticesLoaded = getTotalVerticesLoadedMeter();
185 
186     metrics.getGauge(MetricNames.VERTICES_FILTERED_PCT, new PercentGauge() {
187       @Override protected double getNumerator() {
188         return verticesFiltered.count();
189       }
190 
191       @Override protected double getDenominator() {
192         return verticesLoaded.count();
193       }
194     });
195   }
196 
197   /**
198    * Load vertices/edges from the given input split.
199    *
200    * @param inputSplit Input split to load
201    * @return Count of vertices and edges loaded
202    * @throws IOException
203    * @throws InterruptedException
204    */
205   protected abstract VertexEdgeCount readInputSplit(InputSplit inputSplit)
206     throws IOException, InterruptedException;
207 
208   @Override
209   public VertexEdgeCount call() {
210     VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
211     byte[] serializedInputSplit;
212     int inputSplitsProcessed = 0;
213     try {
214       OutOfCoreEngine oocEngine = serviceWorker.getServerData().getOocEngine();
215       if (oocEngine != null) {
216         oocEngine.processingThreadStart();
217       }
218       while ((serializedInputSplit =
219           splitsHandler.reserveInputSplit(getInputType())) != null) {
220         // If out-of-core mechanism is used, check whether this thread
221         // can stay active or it should temporarily suspend and stop
222         // processing and generating more data for the moment.
223         if (oocEngine != null) {
224           oocEngine.activeThreadCheckIn();
225         }
226         vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
227             loadInputSplit(serializedInputSplit));
228         context.progress();
229         ++inputSplitsProcessed;
230       }
231       if (oocEngine != null) {
232         oocEngine.processingThreadFinish();
233       }
234     } catch (InterruptedException e) {
235       throw new IllegalStateException("call: InterruptedException", e);
236     } catch (IOException e) {
237       throw new IllegalStateException("call: IOException", e);
238     } catch (ClassNotFoundException e) {
239       throw new IllegalStateException("call: ClassNotFoundException", e);
240     }
241 
242     if (LOG.isInfoEnabled()) {
243       float seconds = Times.getNanosSince(TIME, startNanos) /
244           Time.NS_PER_SECOND_AS_FLOAT;
245       float verticesPerSecond = vertexEdgeCount.getVertexCount() / seconds;
246       float edgesPerSecond = vertexEdgeCount.getEdgeCount() / seconds;
247       LOG.info("call: Loaded " + inputSplitsProcessed + " " +
248           "input splits in " + seconds + " secs, " + vertexEdgeCount +
249           " " + verticesPerSecond + " vertices/sec, " +
250           edgesPerSecond + " edges/sec");
251     }
252     try {
253       workerClientRequestProcessor.flush();
254     } catch (IOException e) {
255       throw new IllegalStateException("call: Flushing failed.", e);
256     }
257     return vertexEdgeCount;
258   }
259 
260   /**
261    * Extract vertices from input split, saving them into a mini cache of
262    * partitions.  Periodically flush the cache of vertices when a limit is
263    * reached in readVerticeFromInputSplit.
264    * Mark the input split finished when done.
265    *
266    * @param serializedInputSplit Serialized input split
267    * @return Mapping of vertex indices and statistics, or null if no data read
268    * @throws IOException
269    * @throws ClassNotFoundException
270    * @throws InterruptedException
271    */
272   private VertexEdgeCount loadInputSplit(byte[] serializedInputSplit)
273       throws IOException, ClassNotFoundException, InterruptedException {
274     InputSplit inputSplit = getInputSplit(serializedInputSplit);
275     VertexEdgeCount vertexEdgeCount = readInputSplit(inputSplit);
276     if (LOG.isInfoEnabled()) {
277       LOG.info("loadFromInputSplit: Finished loading " + vertexEdgeCount);
278     }
279     return vertexEdgeCount;
280   }
281 
282   /**
283    * Talk to ZooKeeper to convert the input split path to the actual
284    * InputSplit.
285    *
286    * @param serializedInputSplit Serialized input split
287    * @return instance of InputSplit
288    * @throws IOException
289    * @throws ClassNotFoundException
290    */
291   protected InputSplit getInputSplit(byte[] serializedInputSplit)
292       throws IOException, ClassNotFoundException {
293     DataInputStream inputStream =
294         new DataInputStream(new ByteArrayInputStream(serializedInputSplit));
295     InputSplit inputSplit = getInputFormat().readInputSplit(inputStream);
296 
297     if (LOG.isInfoEnabled()) {
298       LOG.info("getInputSplit: Reserved input split '" +
299           inputSplit.toString() + "'");
300     }
301     return inputSplit;
302   }
303 }