View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.flow_control;
20  
21  import com.yammer.metrics.core.Counter;
22  import org.apache.giraph.comm.netty.NettyClient;
23  import org.apache.giraph.comm.netty.handler.AckSignalFlag;
24  import org.apache.giraph.comm.requests.WritableRequest;
25  import org.apache.giraph.conf.FloatConfOption;
26  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
27  import org.apache.giraph.conf.IntConfOption;
28  import org.apache.giraph.metrics.GiraphMetrics;
29  import org.apache.giraph.metrics.MetricNames;
30  import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
31  import org.apache.giraph.metrics.SuperstepMetricsRegistry;
32  import org.apache.log4j.Logger;
33  
34  import java.util.concurrent.atomic.AtomicInteger;
35  
36  import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
37  
38  /**
39   * Representation of a flow control that limits the aggregate number of open
40   * requests to all other workers to a constant user-defined value
41   */
42  public class StaticFlowControl implements
43      FlowControl, ResetSuperstepMetricsObserver {
44    /** Maximum number of requests without confirmation we should have */
45    public static final IntConfOption MAX_NUMBER_OF_OPEN_REQUESTS =
46        new IntConfOption("giraph.maxNumberOfOpenRequests", 10000,
47            "Maximum number of requests without confirmation we should have");
48    /**
49     * After pausing a thread due to too large number of open requests,
50     * which fraction of these requests need to be closed before we continue
51     */
52    public static final FloatConfOption
53        FRACTION_OF_REQUESTS_TO_CLOSE_BEFORE_PROCEEDING =
54        new FloatConfOption("giraph.fractionOfRequestsToCloseBeforeProceeding",
55            0.2f, "Fraction of requests to close before proceeding");
56    /** Class logger */
57    private static final Logger LOG = Logger.getLogger(StaticFlowControl.class);
58  
59    /** Maximum number of requests without confirmation we can have */
60    private final int maxNumberOfOpenRequests;
61    /**
62     * Maximum number of requests that can be open after the pause in order to
63     * proceed
64     */
65    private final int numberOfRequestsToProceed;
66    /** Netty client used for sending requests */
67    private final NettyClient nettyClient;
68    /** Waiting interval for checking outstanding requests msecs */
69    private final int waitingRequestMsecs;
70    /** Dummy object to wait on until enough open requests get completed */
71    private final Object requestSpotAvailable = new Object();
72    /** Counter for time spent waiting on too many open requests */
73    private Counter timeWaitingOnOpenRequests;
74    /** Number of threads waiting on too many open requests */
75    private final AtomicInteger numWaitingThreads = new AtomicInteger(0);
76  
77    /**
78     * Constructor
79     *
80     * @param conf configuration
81     * @param nettyClient netty client
82     */
83    public StaticFlowControl(ImmutableClassesGiraphConfiguration conf,
84                             NettyClient nettyClient) {
85      this.nettyClient = nettyClient;
86      maxNumberOfOpenRequests = MAX_NUMBER_OF_OPEN_REQUESTS.get(conf);
87      numberOfRequestsToProceed = (int) (maxNumberOfOpenRequests *
88          (1 - FRACTION_OF_REQUESTS_TO_CLOSE_BEFORE_PROCEEDING.get(conf)));
89      if (LOG.isInfoEnabled()) {
90        LOG.info("StaticFlowControl: Limit number of open requests to " +
91            maxNumberOfOpenRequests + " and proceed when <= " +
92            numberOfRequestsToProceed);
93      }
94      waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
95      GiraphMetrics.get().addSuperstepResetObserver(this);
96    }
97  
98    @Override
99    public void newSuperstep(SuperstepMetricsRegistry metrics) {
100     timeWaitingOnOpenRequests = metrics.getCounter(
101         MetricNames.TIME_SPENT_WAITING_ON_TOO_MANY_OPEN_REQUESTS_MS);
102   }
103 
104   @Override
105   public void sendRequest(int destTaskId, WritableRequest request) {
106     nettyClient.doSend(destTaskId, request);
107     if (nettyClient.getNumberOfOpenRequests() > maxNumberOfOpenRequests) {
108       long startTime = System.currentTimeMillis();
109       waitSomeRequests();
110       timeWaitingOnOpenRequests.inc(System.currentTimeMillis() - startTime);
111     }
112   }
113 
114   /**
115    * Ensure that at most numberOfRequestsToProceed are not complete.
116    * Periodically, check the state of every request.  If we find the connection
117    * failed, re-establish it and re-send the request.
118    */
119   private void waitSomeRequests() {
120     numWaitingThreads.getAndIncrement();
121     while (nettyClient.getNumberOfOpenRequests() > numberOfRequestsToProceed) {
122       // Wait for requests to complete for some time
123       synchronized (requestSpotAvailable) {
124         if (nettyClient.getNumberOfOpenRequests() <=
125             numberOfRequestsToProceed) {
126           break;
127         }
128         try {
129           requestSpotAvailable.wait(waitingRequestMsecs);
130         } catch (InterruptedException e) {
131           throw new IllegalStateException("waitSomeRequests: Got unexpected " +
132               "InterruptedException", e);
133         }
134       }
135       nettyClient.logAndSanityCheck();
136     }
137     numWaitingThreads.getAndDecrement();
138   }
139 
140   @Override
141   public void messageAckReceived(int taskId, long requestId, int response) {
142     synchronized (requestSpotAvailable) {
143       requestSpotAvailable.notifyAll();
144     }
145   }
146 
147   @Override
148   public AckSignalFlag getAckSignalFlag(int response) {
149     return AckSignalFlag.values()[response];
150   }
151 
152   @Override
153   public int calculateResponse(AckSignalFlag alreadyDone, int clientId) {
154     return alreadyDone.ordinal();
155   }
156 
157   @Override
158   public void shutdown() { }
159 
160   @Override
161   public void logInfo() {
162     if (LOG.isInfoEnabled()) {
163       LOG.info("logInfo: " + numWaitingThreads.get() + " threads waiting " +
164           "until number of open requests falls below " +
165           numberOfRequestsToProceed);
166     }
167   }
168 
169   @Override
170   public void waitAllRequests() {
171     // This flow control policy does not keep any unsent request. All the open
172     // requests are in possession of the network client, so the network client
173     // will wait for them to complete. Thus, this flow control policy does not
174     // need to do anything regarding flushing the remaining requests.
175   }
176 
177   @Override
178   public int getNumberOfUnsentRequests() {
179     return 0;
180   }
181 }