This project has retired. For details please refer to its Attic page.
RetryableJobProgressTrackerClient xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.graph;
20  
21  import org.apache.giraph.conf.GiraphConfiguration;
22  import org.apache.giraph.conf.IntConfOption;
23  import org.apache.giraph.counters.GiraphCountersThriftStruct;
24  import org.apache.giraph.job.ClientThriftServer;
25  import org.apache.giraph.job.JobProgressTracker;
26  import org.apache.giraph.master.MasterProgress;
27  import org.apache.giraph.utils.ThreadUtils;
28  import org.apache.giraph.worker.WorkerProgress;
29  import org.apache.log4j.Logger;
30  
31  import com.facebook.nifty.client.FramedClientConnector;
32  import com.facebook.nifty.client.NettyClientConfigBuilder;
33  import com.facebook.nifty.client.NiftyClient;
34  import com.facebook.swift.codec.ThriftCodec;
35  import com.facebook.swift.codec.ThriftCodecManager;
36  import com.facebook.swift.service.RuntimeTTransportException;
37  import com.facebook.swift.service.ThriftClientEventHandler;
38  import com.facebook.swift.service.ThriftClientManager;
39  import com.google.common.collect.ImmutableSet;
40  import com.google.common.io.Closeables;
41  
42  import java.io.IOException;
43  import java.net.InetSocketAddress;
44  import java.util.concurrent.ExecutionException;
45  import java.util.concurrent.RejectedExecutionException;
46  
47  /**
48   * Wrapper around JobProgressTracker which retries to connect and swallows
49   * exceptions so app wouldn't crash if something goes wrong with progress
50   * reports.
51   */
52  public class RetryableJobProgressTrackerClient
53      implements JobProgressTrackerClient {
54    /** Conf option for number of retries */
55    public static final IntConfOption RETRYABLE_JOB_PROGRESS_CLIENT_NUM_RETRIES =
56      new IntConfOption("giraph.job.progress.client.num.retries", 1,
57        "Number of times to retry a failed operation");
58    /** Conf option for wait time between retries */
59    public static final IntConfOption
60      RETRYABLE_JOB_PROGRESS_CLIENT_RETRY_WAIT_MS =
61      new IntConfOption("giraph.job.progress.client.retries.wait", 1000,
62        "Time (msec) to wait between retries");
63    /** Class logger */
64    private static final Logger LOG =
65        Logger.getLogger(RetryableJobProgressTrackerClient.class);
66    /** Configuration */
67    private GiraphConfiguration conf;
68    /** Thrift client manager to use to connect to job progress tracker */
69    private ThriftClientManager clientManager;
70    /** Job progress tracker */
71    private JobProgressTracker jobProgressTracker;
72    /** Cached value for number of retries */
73    private int numRetries;
74    /** Cached value for wait time between retries */
75    private int retryWaitMsec;
76  
77    /**
78     * Default constructor. Typically once an instance is created it should be
79     * initialized by calling {@link #init(GiraphConfiguration)}.
80     */
81    public RetryableJobProgressTrackerClient() {
82    }
83  
84    /**
85     * Constructor
86     *
87     * @param conf Giraph configuration
88     */
89    public RetryableJobProgressTrackerClient(GiraphConfiguration conf) throws
90        ExecutionException, InterruptedException {
91      this.conf = conf;
92      numRetries = RETRYABLE_JOB_PROGRESS_CLIENT_NUM_RETRIES.get(conf);
93      retryWaitMsec = RETRYABLE_JOB_PROGRESS_CLIENT_RETRY_WAIT_MS.get(conf);
94      resetConnection();
95    }
96  
97    @Override
98    public void init(GiraphConfiguration conf) throws Exception {
99      this.conf = conf;
100     numRetries = RETRYABLE_JOB_PROGRESS_CLIENT_NUM_RETRIES.get(conf);
101     retryWaitMsec = RETRYABLE_JOB_PROGRESS_CLIENT_RETRY_WAIT_MS.get(conf);
102     resetConnection();
103   }
104 
105   /**
106    * Try to establish new connection to JobProgressTracker
107    */
108   private void resetConnection() throws ExecutionException,
109       InterruptedException {
110     clientManager = new ThriftClientManager(
111         new ThriftCodecManager(new ThriftCodec[0]),
112         new NiftyClient(
113             new NettyClientConfigBuilder().setWorkerThreadCount(2).build()),
114         ImmutableSet.<ThriftClientEventHandler>of());
115     FramedClientConnector connector =
116         new FramedClientConnector(new InetSocketAddress(
117             ClientThriftServer.CLIENT_THRIFT_SERVER_HOST.get(conf),
118             ClientThriftServer.CLIENT_THRIFT_SERVER_PORT.get(conf)));
119     jobProgressTracker =
120         clientManager.createClient(connector, JobProgressTracker.class).get();
121 
122   }
123 
124   @Override
125   public synchronized void cleanup() throws IOException {
126     Closeables.close(clientManager, true);
127     try {
128       clientManager.close();
129       // CHECKSTYLE: stop IllegalCatch
130     } catch (Exception e) {
131       // CHECKSTYLE: resume IllegalCatch
132       if (LOG.isDebugEnabled()) {
133         LOG.debug(
134             "Exception occurred while trying to close JobProgressTracker", e);
135       }
136     }
137   }
138 
139   @Override
140   public synchronized void mapperStarted() {
141     executeWithRetry(new Runnable() {
142       @Override
143       public void run() {
144         jobProgressTracker.mapperStarted();
145       }
146     }, numRetries);
147   }
148 
149   @Override
150   public synchronized void logInfo(final String logLine) {
151     executeWithRetry(new Runnable() {
152       @Override
153       public void run() {
154         jobProgressTracker.logInfo(logLine);
155       }
156     }, numRetries);
157   }
158 
159   @Override
160   public synchronized void logError(final String logLine,
161                                     final byte [] exByteArray) {
162     executeWithRetry(new Runnable() {
163       @Override
164       public void run() {
165         jobProgressTracker.logError(logLine, exByteArray);
166       }
167     }, numRetries);
168   }
169 
170   @Override
171   public synchronized void logFailure(final String reason) {
172     executeWithRetry(new Runnable() {
173       @Override
174       public void run() {
175         jobProgressTracker.logFailure(reason);
176       }
177     }, numRetries);
178   }
179 
180   @Override
181   public synchronized void updateProgress(final WorkerProgress workerProgress) {
182     executeWithRetry(new Runnable() {
183       @Override
184       public void run() {
185         jobProgressTracker.updateProgress(workerProgress);
186       }
187     }, numRetries);
188   }
189 
190   @Override
191   public void updateMasterProgress(final MasterProgress masterProgress) {
192     executeWithRetry(new Runnable() {
193       @Override
194       public void run() {
195         jobProgressTracker.updateMasterProgress(masterProgress);
196       }
197     }, numRetries);
198   }
199 
200   @Override
201   public void sendMasterCounters(GiraphCountersThriftStruct giraphCounters) {
202     executeWithRetry(new Runnable() {
203       @Override
204       public void run() {
205         jobProgressTracker.sendMasterCounters(giraphCounters);
206       }
207     }, numRetries);
208   }
209 
210   /**
211    * Execute Runnable, if disconnected try to connect again and retry
212    *
213    * @param runnable Runnable to execute
214    * @param numRetries Number of retries
215    */
216   private void executeWithRetry(Runnable runnable, int numRetries) {
217     try {
218       runnable.run();
219     } catch (RuntimeTTransportException | RejectedExecutionException te) {
220       if (LOG.isDebugEnabled()) {
221         LOG.debug(te.getClass() + " occurred while talking to " +
222           "JobProgressTracker server, trying to reconnect", te);
223       }
224       for (int i = 1; i <= numRetries; i++) {
225         try {
226           ThreadUtils.trySleep(retryWaitMsec);
227           retry(runnable);
228           break; // If the retry succeeded, we simply break from the loop
229 
230         } catch (RuntimeTTransportException | RejectedExecutionException e) {
231           // If a RuntimeTTTransportException happened, then we will retry
232           if (LOG.isInfoEnabled()) {
233             LOG.info("Exception occurred while talking to " +
234               "JobProgressTracker server after retry " + i +
235               " of " + numRetries, e);
236           }
237           // CHECKSTYLE: stop IllegalCatch
238         } catch (Exception e) {
239           // CHECKSTYLE: resume IllegalCatch
240           // If any other exception happened (e.g. application-specific),
241           // then we stop.
242           LOG.info("Exception occurred while talking to " +
243             "JobProgressTracker server after retry " + i +
244             " of " + numRetries + ", giving up", e);
245           break;
246         }
247       }
248       // CHECKSTYLE: stop IllegalCatch
249     } catch (Exception e) {
250       // CHECKSTYLE: resume IllegalCatch
251       if (LOG.isInfoEnabled()) {
252         LOG.info("Exception occurred while talking to " +
253           "JobProgressTracker server, giving up", e);
254       }
255     }
256   }
257 
258   /**
259    * Executes a single retry by closing the existing {@link #clientManager}
260    * connection, re-initializing it, and then executing the passed instance
261    * of {@link Runnable}.
262    *
263    * @param runnable Instance of {@link Runnable} to execute.
264    * @throws ExecutionException
265    * @throws InterruptedException
266    */
267   private void retry(Runnable runnable) throws ExecutionException,
268     InterruptedException {
269     try {
270       clientManager.close();
271       // CHECKSTYLE: stop IllegalCatch
272     } catch (Exception e) {
273       // CHECKSTYLE: resume IllegalCatch
274       if (LOG.isDebugEnabled()) {
275         LOG.debug(
276           "Exception occurred while trying to close client manager", e);
277       }
278     }
279     resetConnection();
280     runnable.run();
281   }
282 }