View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.netty;
20  
21  import org.apache.giraph.comm.flow_control.FlowControl;
22  /*if_not[HADOOP_NON_SECURE]*/
23  import org.apache.giraph.comm.netty.handler.AuthorizeServerHandler;
24  /*end[HADOOP_NON_SECURE]*/
25  import org.apache.giraph.comm.netty.handler.RequestDecoder;
26  import org.apache.giraph.comm.netty.handler.RequestServerHandler;
27  /*if_not[HADOOP_NON_SECURE]*/
28  import org.apache.giraph.comm.netty.handler.ResponseEncoder;
29  import org.apache.giraph.comm.netty.handler.SaslServerHandler;
30  /*end[HADOOP_NON_SECURE]*/
31  import org.apache.giraph.comm.netty.handler.WorkerRequestReservedMap;
32  import org.apache.giraph.conf.GiraphConstants;
33  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
34  import org.apache.giraph.graph.TaskInfo;
35  import org.apache.giraph.utils.PipelineUtils;
36  import org.apache.giraph.utils.ProgressableUtils;
37  import org.apache.giraph.utils.ThreadUtils;
38  import org.apache.hadoop.util.Progressable;
39  import org.apache.log4j.Logger;
40  import io.netty.bootstrap.ServerBootstrap;
41  import io.netty.channel.group.ChannelGroup;
42  import io.netty.channel.group.DefaultChannelGroup;
43  import io.netty.channel.nio.NioEventLoopGroup;
44  import io.netty.channel.socket.SocketChannel;
45  import io.netty.channel.ChannelHandlerContext;
46  import io.netty.channel.EventLoopGroup;
47  import io.netty.channel.ChannelOption;
48  import io.netty.channel.ChannelInitializer;
49  import io.netty.channel.ChannelInboundHandlerAdapter;
50  import io.netty.channel.ChannelFuture;
51  import io.netty.channel.socket.nio.NioServerSocketChannel;
52  import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
53  /*if_not[HADOOP_NON_SECURE]*/
54  import io.netty.util.AttributeKey;
55  /*end[HADOOP_NON_SECURE]*/
56  import io.netty.util.concurrent.DefaultEventExecutorGroup;
57  import io.netty.util.concurrent.EventExecutorGroup;
58  import io.netty.util.concurrent.ImmediateEventExecutor;
59  import io.netty.channel.AdaptiveRecvByteBufAllocator;
60  
61  import java.net.InetSocketAddress;
62  import java.net.UnknownHostException;
63  
64  import static com.google.common.base.Preconditions.checkState;
65  import static org.apache.giraph.conf.GiraphConstants.MAX_IPC_PORT_BIND_ATTEMPTS;
66  
67  /**
68   * This server uses Netty and will implement all Giraph communication
69   */
70  public class NettyServer {
71    /** Default maximum thread pool size */
72    public static final int MAXIMUM_THREAD_POOL_SIZE_DEFAULT = 32;
73  
74  /*if_not[HADOOP_NON_SECURE]*/
75    /** Used to authenticate with netty clients */
76    public static final AttributeKey<SaslNettyServer>
77    CHANNEL_SASL_NETTY_SERVERS = AttributeKey.valueOf("channelSaslServers");
78  /*end[HADOOP_NON_SECURE]*/
79  
80    /** Class logger */
81    private static final Logger LOG = Logger.getLogger(NettyServer.class);
82    /** Configuration */
83    private final ImmutableClassesGiraphConfiguration conf;
84    /** Progressable for reporting progress */
85    private final Progressable progressable;
86    /** Accepted channels */
87    private final ChannelGroup accepted = new DefaultChannelGroup(
88        ImmediateEventExecutor.INSTANCE);
89    /** Local hostname */
90    private final String localHostOrIp;
91    /** Address of the server */
92    private InetSocketAddress myAddress;
93    /** Current task info */
94    private TaskInfo myTaskInfo;
95    /** Maximum number of threads */
96    private final int maxPoolSize;
97    /** TCP backlog */
98    private final int tcpBacklog;
99    /** Factory for {@link RequestServerHandler} */
100   private final RequestServerHandler.Factory requestServerHandlerFactory;
101 /*if_not[HADOOP_NON_SECURE]*/
102   /** Factory for {@link RequestServerHandler} */
103   private SaslServerHandler.Factory saslServerHandlerFactory;
104 /*end[HADOOP_NON_SECURE]*/
105   /** Server bootstrap */
106   private ServerBootstrap bootstrap;
107   /** Inbound byte counter for this client */
108   private final InboundByteCounter inByteCounter = new InboundByteCounter();
109   /** Outbound byte counter for this client */
110   private final OutboundByteCounter outByteCounter = new OutboundByteCounter();
111   /** Send buffer size */
112   private final int sendBufferSize;
113   /** Receive buffer size */
114   private final int receiveBufferSize;
115   /** Boss eventloop group */
116   private final EventLoopGroup bossGroup;
117   /** Worker eventloop group */
118   private final EventLoopGroup workerGroup;
119   /** Request completed map per worker */
120   private final WorkerRequestReservedMap workerRequestReservedMap;
121   /** Use execution group? */
122   private final boolean useExecutionGroup;
123   /** Execution handler (if used) */
124   private final EventExecutorGroup executionGroup;
125   /** Name of the handler before the execution handler (if used) */
126   private final String handlerToUseExecutionGroup;
127   /** Handles all uncaught exceptions in netty threads */
128   private final Thread.UncaughtExceptionHandler exceptionHandler;
129 
130 
131   /**
132    * Constructor for creating the server
133    *
134    * @param conf Configuration to use
135    * @param requestServerHandlerFactory Factory for request handlers
136    * @param myTaskInfo Current task info
137    * @param progressable Progressable for reporting progress
138    * @param exceptionHandler handle uncaught exceptions
139    */
140   public NettyServer(ImmutableClassesGiraphConfiguration conf,
141       RequestServerHandler.Factory requestServerHandlerFactory,
142       TaskInfo myTaskInfo, Progressable progressable,
143       Thread.UncaughtExceptionHandler exceptionHandler) {
144     this.conf = conf;
145     this.progressable = progressable;
146     this.requestServerHandlerFactory = requestServerHandlerFactory;
147 /*if_not[HADOOP_NON_SECURE]*/
148     this.saslServerHandlerFactory = new SaslServerHandler.Factory();
149 /*end[HADOOP_NON_SECURE]*/
150     this.myTaskInfo = myTaskInfo;
151     this.exceptionHandler = exceptionHandler;
152     sendBufferSize = GiraphConstants.SERVER_SEND_BUFFER_SIZE.get(conf);
153     receiveBufferSize = GiraphConstants.SERVER_RECEIVE_BUFFER_SIZE.get(conf);
154 
155     workerRequestReservedMap = new WorkerRequestReservedMap(conf);
156 
157     maxPoolSize = GiraphConstants.NETTY_SERVER_THREADS.get(conf);
158 
159     bossGroup = new NioEventLoopGroup(4,
160         ThreadUtils.createThreadFactory(
161             "netty-server-boss-%d", exceptionHandler));
162 
163     workerGroup = new NioEventLoopGroup(maxPoolSize,
164         ThreadUtils.createThreadFactory(
165             "netty-server-worker-%d", exceptionHandler));
166 
167     try {
168       this.localHostOrIp = conf.getLocalHostOrIp();
169     } catch (UnknownHostException e) {
170       throw new IllegalStateException("NettyServer: unable to get hostname");
171     }
172 
173     tcpBacklog = conf.getInt(GiraphConstants.TCP_BACKLOG.getKey(),
174         conf.getInt(GiraphConstants.MAX_WORKERS,
175             GiraphConstants.TCP_BACKLOG.getDefaultValue()));
176 
177     handlerToUseExecutionGroup =
178         GiraphConstants.NETTY_SERVER_EXECUTION_AFTER_HANDLER.get(conf);
179     useExecutionGroup =
180         GiraphConstants.NETTY_SERVER_USE_EXECUTION_HANDLER.get(conf);
181     if (useExecutionGroup) {
182       int executionThreads = conf.getNettyServerExecutionThreads();
183       executionGroup = new DefaultEventExecutorGroup(executionThreads,
184           ThreadUtils.createThreadFactory(
185               "netty-server-exec-%d", exceptionHandler));
186       if (LOG.isInfoEnabled()) {
187         LOG.info("NettyServer: Using execution group with " +
188             executionThreads + " threads for " +
189             handlerToUseExecutionGroup + ".");
190       }
191     } else {
192       executionGroup = null;
193     }
194   }
195 
196 /*if_not[HADOOP_NON_SECURE]*/
197   /**
198    * Constructor for creating the server
199    *
200    * @param conf Configuration to use
201    * @param requestServerHandlerFactory Factory for request handlers
202    * @param myTaskInfo Current task info
203    * @param progressable Progressable for reporting progress
204    * @param saslServerHandlerFactory  Factory for SASL handlers
205    * @param exceptionHandler handle uncaught exceptions
206    */
207   public NettyServer(ImmutableClassesGiraphConfiguration conf,
208                      RequestServerHandler.Factory requestServerHandlerFactory,
209                      TaskInfo myTaskInfo,
210                      Progressable progressable,
211                      SaslServerHandler.Factory saslServerHandlerFactory,
212                      Thread.UncaughtExceptionHandler exceptionHandler) {
213     this(conf, requestServerHandlerFactory, myTaskInfo,
214         progressable, exceptionHandler);
215     this.saslServerHandlerFactory = saslServerHandlerFactory;
216   }
217 /*end[HADOOP_NON_SECURE]*/
218 
219   /**
220    * Returns a handle on the in-bound byte counter.
221    * @return The {@link InboundByteCounter} object for this server.
222    */
223   public InboundByteCounter getInByteCounter() {
224     return inByteCounter;
225   }
226 
227   /**
228    * Start the server with the appropriate port
229    */
230   public void start() {
231     bootstrap = new ServerBootstrap();
232     bootstrap.group(bossGroup, workerGroup)
233         .channel(NioServerSocketChannel.class)
234         .option(ChannelOption.SO_BACKLOG, tcpBacklog)
235         .option(ChannelOption.ALLOCATOR, conf.getNettyAllocator())
236         .childOption(ChannelOption.SO_KEEPALIVE, true)
237         .childOption(ChannelOption.TCP_NODELAY, true)
238         .childOption(ChannelOption.SO_SNDBUF, sendBufferSize)
239         .childOption(ChannelOption.SO_RCVBUF, receiveBufferSize)
240         .childOption(ChannelOption.ALLOCATOR, conf.getNettyAllocator())
241         .childOption(ChannelOption.RCVBUF_ALLOCATOR,
242             new AdaptiveRecvByteBufAllocator(receiveBufferSize / 4,
243                 receiveBufferSize, receiveBufferSize));
244 
245     /**
246      * Pipeline setup: depends on whether configured to use authentication
247      * or not.
248      */
249     bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
250       @Override
251       protected void initChannel(SocketChannel ch) throws Exception {
252 /*if_not[HADOOP_NON_SECURE]*/
253         if (conf.authenticate()) {
254           LOG.info("start: Will use Netty pipeline with " +
255               "authentication and authorization of clients.");
256           // After a client authenticates, the two authentication-specific
257           // pipeline components SaslServerHandler and ResponseEncoder are
258           // removed, leaving the pipeline the same as in the non-authenticated
259           // configuration except for the presence of the Authorize component.
260           PipelineUtils.addLastWithExecutorCheck("serverInboundByteCounter",
261               inByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
262           if (conf.doCompression()) {
263             PipelineUtils.addLastWithExecutorCheck("compressionDecoder",
264                 conf.getNettyCompressionDecoder(),
265                 handlerToUseExecutionGroup, executionGroup, ch);
266           }
267           PipelineUtils.addLastWithExecutorCheck("serverOutboundByteCounter",
268               outByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
269           if (conf.doCompression()) {
270             PipelineUtils.addLastWithExecutorCheck("compressionEncoder",
271                 conf.getNettyCompressionEncoder(),
272                 handlerToUseExecutionGroup, executionGroup, ch);
273           }
274           PipelineUtils.addLastWithExecutorCheck("requestFrameDecoder",
275               new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 0, 4, 0, 4),
276               handlerToUseExecutionGroup, executionGroup, ch);
277           PipelineUtils.addLastWithExecutorCheck("requestDecoder",
278               new RequestDecoder(conf, inByteCounter),
279               handlerToUseExecutionGroup, executionGroup, ch);
280           // Removed after authentication completes:
281           PipelineUtils.addLastWithExecutorCheck("saslServerHandler",
282               saslServerHandlerFactory.newHandler(conf),
283               handlerToUseExecutionGroup, executionGroup, ch);
284           PipelineUtils.addLastWithExecutorCheck("authorizeServerHandler",
285               new AuthorizeServerHandler(), handlerToUseExecutionGroup,
286               executionGroup, ch);
287           PipelineUtils.addLastWithExecutorCheck("requestServerHandler",
288               requestServerHandlerFactory.newHandler(workerRequestReservedMap,
289                   conf, myTaskInfo, exceptionHandler),
290               handlerToUseExecutionGroup, executionGroup, ch);
291           // Removed after authentication completes:
292           PipelineUtils.addLastWithExecutorCheck("responseEncoder",
293               new ResponseEncoder(), handlerToUseExecutionGroup,
294               executionGroup, ch);
295         } else {
296           LOG.info("start: Using Netty without authentication.");
297 /*end[HADOOP_NON_SECURE]*/
298           // Store all connected channels in order to ensure that we can close
299           // them on stop(), or else stop() may hang waiting for the
300           // connections to close on their own
301           ch.pipeline().addLast("connectedChannels",
302               new ChannelInboundHandlerAdapter() {
303                 @Override
304                 public void channelActive(ChannelHandlerContext ctx)
305                   throws Exception {
306                   accepted.add(ctx.channel());
307                   ctx.fireChannelActive();
308                 }
309               });
310           PipelineUtils.addLastWithExecutorCheck("serverInboundByteCounter",
311               inByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
312           if (conf.doCompression()) {
313             PipelineUtils.addLastWithExecutorCheck("compressionDecoder",
314                 conf.getNettyCompressionDecoder(),
315                 handlerToUseExecutionGroup, executionGroup, ch);
316           }
317           PipelineUtils.addLastWithExecutorCheck("serverOutboundByteCounter",
318               outByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
319           if (conf.doCompression()) {
320             PipelineUtils.addLastWithExecutorCheck("compressionEncoder",
321                 conf.getNettyCompressionEncoder(),
322                 handlerToUseExecutionGroup, executionGroup, ch);
323           }
324           PipelineUtils.addLastWithExecutorCheck("requestFrameDecoder",
325               new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 0, 4, 0, 4),
326               handlerToUseExecutionGroup, executionGroup, ch);
327           PipelineUtils.addLastWithExecutorCheck("requestDecoder",
328               new RequestDecoder(conf, inByteCounter),
329               handlerToUseExecutionGroup, executionGroup, ch);
330           PipelineUtils.addLastWithExecutorCheck("requestServerHandler",
331               requestServerHandlerFactory.newHandler(
332                   workerRequestReservedMap, conf, myTaskInfo, exceptionHandler),
333               handlerToUseExecutionGroup, executionGroup, ch);
334 /*if_not[HADOOP_NON_SECURE]*/
335         }
336 /*end[HADOOP_NON_SECURE]*/
337       }
338     });
339 
340     int taskId = conf.getTaskPartition();
341     int numTasks = conf.getInt("mapred.map.tasks", 1);
342     // Number of workers + 1 for master
343     int numServers = conf.getInt(GiraphConstants.MAX_WORKERS, numTasks) + 1;
344     int portIncrementConstant =
345         (int) Math.pow(10, Math.ceil(Math.log10(numServers)));
346     int bindPort = GiraphConstants.IPC_INITIAL_PORT.get(conf) + taskId;
347     int bindAttempts = 0;
348     final int maxIpcPortBindAttempts = MAX_IPC_PORT_BIND_ATTEMPTS.get(conf);
349     final boolean failFirstPortBindingAttempt =
350         GiraphConstants.FAIL_FIRST_IPC_PORT_BIND_ATTEMPT.get(conf);
351 
352     // Simple handling of port collisions on the same machine while
353     // preserving debugability from the port number alone.
354     // Round up the max number of workers to the next power of 10 and use
355     // it as a constant to increase the port number with.
356     while (bindAttempts < maxIpcPortBindAttempts) {
357       this.myAddress = new InetSocketAddress(localHostOrIp, bindPort);
358       if (failFirstPortBindingAttempt && bindAttempts == 0) {
359         if (LOG.isInfoEnabled()) {
360           LOG.info("start: Intentionally fail first " +
361               "binding attempt as giraph.failFirstIpcPortBindAttempt " +
362               "is true, port " + bindPort);
363         }
364         ++bindAttempts;
365         bindPort += portIncrementConstant;
366         continue;
367       }
368 
369       try {
370         ChannelFuture f = bootstrap.bind(myAddress).sync();
371         accepted.add(f.channel());
372         break;
373       } catch (InterruptedException e) {
374         throw new IllegalStateException(e);
375         // CHECKSTYLE: stop IllegalCatchCheck
376       } catch (Exception e) {
377         // CHECKSTYLE: resume IllegalCatchCheck
378         LOG.warn("start: Likely failed to bind on attempt " +
379             bindAttempts + " to port " + bindPort, e.getCause());
380         ++bindAttempts;
381         bindPort += portIncrementConstant;
382       }
383     }
384     if (bindAttempts == maxIpcPortBindAttempts || myAddress == null) {
385       throw new IllegalStateException(
386           "start: Failed to start NettyServer with " +
387               bindAttempts + " attempts");
388     }
389 
390     if (LOG.isInfoEnabled()) {
391       LOG.info("start: Started server " +
392           "communication server: " + myAddress + " with up to " +
393           maxPoolSize + " threads on bind attempt " + bindAttempts +
394           " with sendBufferSize = " + sendBufferSize +
395           " receiveBufferSize = " + receiveBufferSize);
396     }
397   }
398 
399   /**
400    * Stop the server.
401    */
402   public void stop() {
403     if (LOG.isInfoEnabled()) {
404       LOG.info("stop: Halting netty server");
405     }
406     ProgressableUtils.awaitChannelGroupFuture(accepted.close(), progressable);
407     if (LOG.isInfoEnabled()) {
408       LOG.info("stop: Start releasing resources");
409     }
410     bossGroup.shutdownGracefully();
411     workerGroup.shutdownGracefully();
412     ProgressableUtils.awaitTerminationFuture(bossGroup, progressable);
413     ProgressableUtils.awaitTerminationFuture(workerGroup, progressable);
414     if (useExecutionGroup) {
415       executionGroup.shutdownGracefully();
416       ProgressableUtils.awaitTerminationFuture(executionGroup, progressable);
417     }
418     if (LOG.isInfoEnabled()) {
419       LOG.info("stop: Netty server halted");
420     }
421   }
422 
423   public InetSocketAddress getMyAddress() {
424     return myAddress;
425   }
426 
427   public String getLocalHostOrIp() {
428     return localHostOrIp;
429   }
430 
431   /**
432    * Inform the server about the flow control policy used in sending requests
433    *
434    * @param flowControl reference to the flow control used
435    */
436   public void setFlowControl(FlowControl flowControl) {
437     checkState(requestServerHandlerFactory != null);
438     requestServerHandlerFactory.setFlowControl(flowControl);
439   }
440 }
441