View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.ooc;
20  
21  import com.yammer.metrics.core.Counter;
22  import com.yammer.metrics.core.Histogram;
23  import org.apache.giraph.metrics.GiraphMetrics;
24  import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
25  import org.apache.giraph.metrics.SuperstepMetricsRegistry;
26  import org.apache.giraph.ooc.command.IOCommand;
27  import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
28  import org.apache.giraph.ooc.command.WaitIOCommand;
29  import org.apache.log4j.Logger;
30  
31  import java.util.concurrent.Callable;
32  
33  /**
34   * IO threads for out-of-core mechanism.
35   */
36  public class OutOfCoreIOCallable implements Callable<Void>,
37      ResetSuperstepMetricsObserver {
38    /** Name of Metric for number of bytes read from disk */
39    public static final String BYTES_LOAD_FROM_DISK = "ooc-bytes-load";
40    /** Name of Metric for number of bytes written to disk */
41    public static final String BYTES_STORE_TO_DISK = "ooc-bytes-store";
42    /** Name of Metric for size of loads */
43    public static final String HISTOGRAM_LOAD_SIZE = "ooc-load-size-bytes";
44    /** Name of Metric for size of stores */
45    public static final String HISTOGRAM_STORE_SIZE = "ooc-store-size-bytes";
46    /** Class logger. */
47    private static final Logger LOG = Logger.getLogger(OutOfCoreIOCallable.class);
48    /** Out-of-core engine */
49    private final OutOfCoreEngine oocEngine;
50    /** Thread id/Disk id */
51    private final int diskId;
52    /** How many bytes of data is read from disk */
53    private Counter bytesReadPerSuperstep;
54    /** How many bytes of data is written to disk */
55    private Counter bytesWrittenPerSuperstep;
56    /** Size of load IO commands */
57    private Histogram histogramLoadSize;
58    /** Size of store IO commands */
59    private Histogram histogramStoreSize;
60  
61    /**
62     * Constructor
63     *
64     * @param oocEngine out-of-core engine
65     * @param diskId thread id/disk id
66     */
67    public OutOfCoreIOCallable(OutOfCoreEngine oocEngine, int diskId) {
68      this.oocEngine = oocEngine;
69      this.diskId = diskId;
70      newSuperstep(GiraphMetrics.get().perSuperstep());
71      GiraphMetrics.get().addSuperstepResetObserver(this);
72    }
73  
74    @Override
75    public Void call() throws Exception {
76      while (true) {
77        oocEngine.getSuperstepLock().readLock().lock();
78        IOCommand command = oocEngine.getIOScheduler().getNextIOCommand(diskId);
79        if (LOG.isInfoEnabled() && !(command instanceof WaitIOCommand)) {
80          LOG.info("call: thread " + diskId + "'s next IO command is: " +
81              command);
82        }
83        if (command == null) {
84          oocEngine.getSuperstepLock().readLock().unlock();
85          break;
86        }
87        if (command instanceof WaitIOCommand) {
88          oocEngine.getSuperstepLock().readLock().unlock();
89        }
90  
91        boolean commandExecuted = false;
92        long duration = 0;
93        long bytes;
94        // CHECKSTYLE: stop IllegalCatch
95        try {
96          long timeInGC = oocEngine.getServiceWorker().getGraphTaskManager()
97              .getSuperstepGCTime();
98          long startTime = System.currentTimeMillis();
99          commandExecuted = command.execute();
100         duration = System.currentTimeMillis() - startTime;
101         timeInGC = oocEngine.getServiceWorker().getGraphTaskManager()
102             .getSuperstepGCTime() - timeInGC;
103         bytes = command.bytesTransferred();
104         if (LOG.isInfoEnabled() && !(command instanceof WaitIOCommand)) {
105           LOG.info("call: thread " + diskId + "'s command " + command +
106               " completed: bytes= " + bytes + ", duration=" + duration + ", " +
107               "bandwidth=" + String.format("%.2f", (double) bytes / duration *
108               1000 / 1024 / 1024) +
109               ((command instanceof WaitIOCommand) ? "" :
110                   (", bandwidth (excluding GC time)=" + String.format("%.2f",
111                       (double) bytes / (duration - timeInGC) *
112                           1000 / 1024 / 1024))));
113         }
114       } catch (Exception e) {
115         throw new RuntimeException(
116             "call: execution of IO command " + command + " failed!", e);
117       }
118       // CHECKSTYLE: resume IllegalCatch
119       if (!(command instanceof WaitIOCommand)) {
120         oocEngine.getSuperstepLock().readLock().unlock();
121         if (bytes != 0) {
122           if (command instanceof LoadPartitionIOCommand) {
123             bytesReadPerSuperstep.inc(bytes);
124             histogramLoadSize.update(bytes);
125           } else {
126             bytesWrittenPerSuperstep.inc(bytes);
127             histogramStoreSize.update(bytes);
128           }
129         }
130       }
131 
132       if (commandExecuted && duration > 0) {
133         oocEngine.getIOStatistics().update(command.getType(),
134             command.bytesTransferred(), duration);
135       }
136       oocEngine.getIOScheduler().ioCommandCompleted(command);
137     }
138     if (LOG.isInfoEnabled()) {
139       LOG.info("call: out-of-core IO thread " + diskId + " terminating!");
140     }
141     return null;
142   }
143 
144   @Override
145   public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
146     bytesReadPerSuperstep = superstepMetrics.getCounter(BYTES_LOAD_FROM_DISK);
147     bytesWrittenPerSuperstep =
148         superstepMetrics.getCounter(BYTES_STORE_TO_DISK);
149     histogramLoadSize =
150         superstepMetrics.getUniformHistogram(HISTOGRAM_LOAD_SIZE);
151     histogramStoreSize =
152         superstepMetrics.getUniformHistogram(HISTOGRAM_STORE_SIZE);
153   }
154 }
155