View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.graph;
20  
21  import org.apache.giraph.conf.GiraphConfiguration;
22  import org.apache.giraph.job.ClientThriftServer;
23  import org.apache.giraph.job.JobProgressTracker;
24  import org.apache.giraph.worker.WorkerProgress;
25  import org.apache.log4j.Logger;
26  
27  import com.facebook.nifty.client.FramedClientConnector;
28  import com.facebook.nifty.client.NettyClientConfigBuilder;
29  import com.facebook.nifty.client.NiftyClient;
30  import com.facebook.swift.codec.ThriftCodec;
31  import com.facebook.swift.codec.ThriftCodecManager;
32  import com.facebook.swift.service.RuntimeTTransportException;
33  import com.facebook.swift.service.ThriftClientEventHandler;
34  import com.facebook.swift.service.ThriftClientManager;
35  import com.google.common.collect.ImmutableSet;
36  import com.google.common.io.Closeables;
37  
38  import java.io.IOException;
39  import java.net.InetSocketAddress;
40  import java.util.concurrent.ExecutionException;
41  import java.util.concurrent.RejectedExecutionException;
42  
43  /**
44   * Wrapper around JobProgressTracker which retires to connect and swallows
45   * exceptions so app wouldn't crash if something goes wrong with progress
46   * reports.
47   */
48  public class RetryableJobProgressTrackerClient
49      implements JobProgressTrackerClient {
50    /** Class logger */
51    private static final Logger LOG =
52        Logger.getLogger(RetryableJobProgressTrackerClient.class);
53    /** Configuration */
54    private final GiraphConfiguration conf;
55    /** Thrift client manager to use to connect to job progress tracker */
56    private ThriftClientManager clientManager;
57    /** Job progress tracker */
58    private JobProgressTracker jobProgressTracker;
59  
60    /**
61     * Constructor
62     *
63     * @param conf Giraph configuration
64     */
65    public RetryableJobProgressTrackerClient(GiraphConfiguration conf) throws
66        ExecutionException, InterruptedException {
67      this.conf = conf;
68      resetConnection();
69    }
70  
71    /**
72     * Try to establish new connection to JobProgressTracker
73     */
74    private void resetConnection() throws ExecutionException,
75        InterruptedException {
76      clientManager = new ThriftClientManager(
77          new ThriftCodecManager(new ThriftCodec[0]),
78          new NiftyClient(
79              new NettyClientConfigBuilder().setWorkerThreadCount(2).build()),
80          ImmutableSet.<ThriftClientEventHandler>of());
81      FramedClientConnector connector =
82          new FramedClientConnector(new InetSocketAddress(
83              ClientThriftServer.CLIENT_THRIFT_SERVER_HOST.get(conf),
84              ClientThriftServer.CLIENT_THRIFT_SERVER_PORT.get(conf)));
85      jobProgressTracker =
86          clientManager.createClient(connector, JobProgressTracker.class).get();
87  
88    }
89  
90    @Override
91    public synchronized void cleanup() throws IOException {
92      Closeables.close(clientManager, true);
93      try {
94        clientManager.close();
95        // CHECKSTYLE: stop IllegalCatch
96      } catch (Exception e) {
97        // CHECKSTYLE: resume IllegalCatch
98        if (LOG.isDebugEnabled()) {
99          LOG.debug(
100             "Exception occurred while trying to close JobProgressTracker", e);
101       }
102     }
103   }
104 
105   @Override
106   public synchronized void mapperStarted() {
107     executeWithRetry(new Runnable() {
108       @Override
109       public void run() {
110         jobProgressTracker.mapperStarted();
111       }
112     });
113   }
114 
115   @Override
116   public synchronized void logInfo(final String logLine) {
117     executeWithRetry(new Runnable() {
118       @Override
119       public void run() {
120         jobProgressTracker.logInfo(logLine);
121       }
122     });
123   }
124 
125   @Override
126   public synchronized void logError(final String logLine) {
127     executeWithRetry(new Runnable() {
128       @Override
129       public void run() {
130         jobProgressTracker.logError(logLine);
131       }
132     });
133   }
134 
135   @Override
136   public synchronized void logFailure(final String reason) {
137     executeWithRetry(new Runnable() {
138       @Override
139       public void run() {
140         jobProgressTracker.logFailure(reason);
141       }
142     });
143   }
144 
145   @Override
146   public synchronized void updateProgress(final WorkerProgress workerProgress) {
147     executeWithRetry(new Runnable() {
148       @Override
149       public void run() {
150         jobProgressTracker.updateProgress(workerProgress);
151       }
152     });
153   }
154 
155   /**
156    * Execute Runnable, if disconnected try to connect again and retry
157    *
158    * @param runnable Runnable to execute
159    */
160   private void executeWithRetry(Runnable runnable) {
161     try {
162       runnable.run();
163     } catch (RuntimeTTransportException | RejectedExecutionException te) {
164       if (LOG.isDebugEnabled()) {
165         LOG.debug(te.getClass() + " occurred while talking to " +
166             "JobProgressTracker server, trying to reconnect", te);
167       }
168       try {
169         try {
170           clientManager.close();
171           // CHECKSTYLE: stop IllegalCatch
172         } catch (Exception e) {
173           // CHECKSTYLE: resume IllegalCatch
174           if (LOG.isDebugEnabled()) {
175             LOG.debug(
176                 "Exception occurred while trying to close client manager", e);
177           }
178         }
179         resetConnection();
180         runnable.run();
181         // CHECKSTYLE: stop IllegalCatch
182       } catch (Exception e) {
183         // CHECKSTYLE: resume IllegalCatch
184         if (LOG.isInfoEnabled()) {
185           LOG.info("Exception occurred while talking to " +
186               "JobProgressTracker server, giving up", e);
187         }
188       }
189       // CHECKSTYLE: stop IllegalCatch
190     } catch (Exception e) {
191       // CHECKSTYLE: resume IllegalCatch
192       if (LOG.isInfoEnabled()) {
193         LOG.info("Exception occurred while talking to " +
194             "JobProgressTracker server, giving up", e);
195       }
196     }
197   }
198 }