This project has retired. For details please refer to its
Attic page.
RetryableJobProgressTrackerClient xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.graph;
20
21 import org.apache.giraph.conf.GiraphConfiguration;
22 import org.apache.giraph.conf.IntConfOption;
23 import org.apache.giraph.counters.GiraphCountersThriftStruct;
24 import org.apache.giraph.job.ClientThriftServer;
25 import org.apache.giraph.job.JobProgressTracker;
26 import org.apache.giraph.master.MasterProgress;
27 import org.apache.giraph.utils.ThreadUtils;
28 import org.apache.giraph.worker.WorkerProgress;
29 import org.apache.log4j.Logger;
30
31 import com.facebook.nifty.client.FramedClientConnector;
32 import com.facebook.nifty.client.NettyClientConfigBuilder;
33 import com.facebook.nifty.client.NiftyClient;
34 import com.facebook.swift.codec.ThriftCodec;
35 import com.facebook.swift.codec.ThriftCodecManager;
36 import com.facebook.swift.service.RuntimeTTransportException;
37 import com.facebook.swift.service.ThriftClientEventHandler;
38 import com.facebook.swift.service.ThriftClientManager;
39 import com.google.common.collect.ImmutableSet;
40 import com.google.common.io.Closeables;
41
42 import java.io.IOException;
43 import java.net.InetSocketAddress;
44 import java.util.concurrent.ExecutionException;
45 import java.util.concurrent.RejectedExecutionException;
46
47
48
49
50
51
52 public class RetryableJobProgressTrackerClient
53 implements JobProgressTrackerClient {
54
55 public static final IntConfOption RETRYABLE_JOB_PROGRESS_CLIENT_NUM_RETRIES =
56 new IntConfOption("giraph.job.progress.client.num.retries", 1,
57 "Number of times to retry a failed operation");
58
59 public static final IntConfOption
60 RETRYABLE_JOB_PROGRESS_CLIENT_RETRY_WAIT_MS =
61 new IntConfOption("giraph.job.progress.client.retries.wait", 1000,
62 "Time (msec) to wait between retries");
63
64 private static final Logger LOG =
65 Logger.getLogger(RetryableJobProgressTrackerClient.class);
66
67 private GiraphConfiguration conf;
68
69 private ThriftClientManager clientManager;
70
71 private JobProgressTracker jobProgressTracker;
72
73 private int numRetries;
74
75 private int retryWaitMsec;
76
77
78
79
80
81 public RetryableJobProgressTrackerClient() {
82 }
83
84
85
86
87
88
89 public RetryableJobProgressTrackerClient(GiraphConfiguration conf) throws
90 ExecutionException, InterruptedException {
91 this.conf = conf;
92 numRetries = RETRYABLE_JOB_PROGRESS_CLIENT_NUM_RETRIES.get(conf);
93 retryWaitMsec = RETRYABLE_JOB_PROGRESS_CLIENT_RETRY_WAIT_MS.get(conf);
94 resetConnection();
95 }
96
97 @Override
98 public void init(GiraphConfiguration conf) throws Exception {
99 this.conf = conf;
100 numRetries = RETRYABLE_JOB_PROGRESS_CLIENT_NUM_RETRIES.get(conf);
101 retryWaitMsec = RETRYABLE_JOB_PROGRESS_CLIENT_RETRY_WAIT_MS.get(conf);
102 resetConnection();
103 }
104
105
106
107
108 private void resetConnection() throws ExecutionException,
109 InterruptedException {
110 clientManager = new ThriftClientManager(
111 new ThriftCodecManager(new ThriftCodec[0]),
112 new NiftyClient(
113 new NettyClientConfigBuilder().setWorkerThreadCount(2).build()),
114 ImmutableSet.<ThriftClientEventHandler>of());
115 FramedClientConnector connector =
116 new FramedClientConnector(new InetSocketAddress(
117 ClientThriftServer.CLIENT_THRIFT_SERVER_HOST.get(conf),
118 ClientThriftServer.CLIENT_THRIFT_SERVER_PORT.get(conf)));
119 jobProgressTracker =
120 clientManager.createClient(connector, JobProgressTracker.class).get();
121
122 }
123
124 @Override
125 public synchronized void cleanup() throws IOException {
126 Closeables.close(clientManager, true);
127 try {
128 clientManager.close();
129
130 } catch (Exception e) {
131
132 if (LOG.isDebugEnabled()) {
133 LOG.debug(
134 "Exception occurred while trying to close JobProgressTracker", e);
135 }
136 }
137 }
138
139 @Override
140 public synchronized void mapperStarted() {
141 executeWithRetry(new Runnable() {
142 @Override
143 public void run() {
144 jobProgressTracker.mapperStarted();
145 }
146 }, numRetries);
147 }
148
149 @Override
150 public synchronized void logInfo(final String logLine) {
151 executeWithRetry(new Runnable() {
152 @Override
153 public void run() {
154 jobProgressTracker.logInfo(logLine);
155 }
156 }, numRetries);
157 }
158
159 @Override
160 public synchronized void logError(final String logLine,
161 final byte [] exByteArray) {
162 executeWithRetry(new Runnable() {
163 @Override
164 public void run() {
165 jobProgressTracker.logError(logLine, exByteArray);
166 }
167 }, numRetries);
168 }
169
170 @Override
171 public synchronized void logFailure(final String reason) {
172 executeWithRetry(new Runnable() {
173 @Override
174 public void run() {
175 jobProgressTracker.logFailure(reason);
176 }
177 }, numRetries);
178 }
179
180 @Override
181 public synchronized void updateProgress(final WorkerProgress workerProgress) {
182 executeWithRetry(new Runnable() {
183 @Override
184 public void run() {
185 jobProgressTracker.updateProgress(workerProgress);
186 }
187 }, numRetries);
188 }
189
190 @Override
191 public void updateMasterProgress(final MasterProgress masterProgress) {
192 executeWithRetry(new Runnable() {
193 @Override
194 public void run() {
195 jobProgressTracker.updateMasterProgress(masterProgress);
196 }
197 }, numRetries);
198 }
199
200 @Override
201 public void sendMasterCounters(GiraphCountersThriftStruct giraphCounters) {
202 executeWithRetry(new Runnable() {
203 @Override
204 public void run() {
205 jobProgressTracker.sendMasterCounters(giraphCounters);
206 }
207 }, numRetries);
208 }
209
210
211
212
213
214
215
216 private void executeWithRetry(Runnable runnable, int numRetries) {
217 try {
218 runnable.run();
219 } catch (RuntimeTTransportException | RejectedExecutionException te) {
220 if (LOG.isDebugEnabled()) {
221 LOG.debug(te.getClass() + " occurred while talking to " +
222 "JobProgressTracker server, trying to reconnect", te);
223 }
224 for (int i = 1; i <= numRetries; i++) {
225 try {
226 ThreadUtils.trySleep(retryWaitMsec);
227 retry(runnable);
228 break;
229
230 } catch (RuntimeTTransportException | RejectedExecutionException e) {
231
232 if (LOG.isInfoEnabled()) {
233 LOG.info("Exception occurred while talking to " +
234 "JobProgressTracker server after retry " + i +
235 " of " + numRetries, e);
236 }
237
238 } catch (Exception e) {
239
240
241
242 LOG.info("Exception occurred while talking to " +
243 "JobProgressTracker server after retry " + i +
244 " of " + numRetries + ", giving up", e);
245 break;
246 }
247 }
248
249 } catch (Exception e) {
250
251 if (LOG.isInfoEnabled()) {
252 LOG.info("Exception occurred while talking to " +
253 "JobProgressTracker server, giving up", e);
254 }
255 }
256 }
257
258
259
260
261
262
263
264
265
266
267 private void retry(Runnable runnable) throws ExecutionException,
268 InterruptedException {
269 try {
270 clientManager.close();
271
272 } catch (Exception e) {
273
274 if (LOG.isDebugEnabled()) {
275 LOG.debug(
276 "Exception occurred while trying to close client manager", e);
277 }
278 }
279 resetConnection();
280 runnable.run();
281 }
282 }