This project has retired. For details please refer to its Attic page.
NettyWorkerServer xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.netty;
20  
21  import org.apache.giraph.bsp.CentralizedServiceWorker;
22  import org.apache.giraph.comm.ServerData;
23  import org.apache.giraph.comm.WorkerServer;
24  import org.apache.giraph.comm.flow_control.FlowControl;
25  import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
26  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
27  import org.apache.hadoop.io.Writable;
28  import org.apache.hadoop.io.WritableComparable;
29  import org.apache.hadoop.mapreduce.Mapper;
30  import org.apache.log4j.Logger;
31  
32  import java.net.InetSocketAddress;
33  
34  /**
35   * Netty worker server that implement {@link WorkerServer} and contains
36   * the actual {@link ServerData}.
37   *
38   * @param <I> Vertex id
39   * @param <V> Vertex data
40   * @param <E> Edge data
41   */
42  @SuppressWarnings("rawtypes")
43  public class NettyWorkerServer<I extends WritableComparable,
44      V extends Writable, E extends Writable>
45      implements WorkerServer<I, V, E> {
46    /** Class logger */
47    private static final Logger LOG =
48      Logger.getLogger(NettyWorkerServer.class);
49    /** Hadoop configuration */
50    private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
51    /** Service worker */
52    private final CentralizedServiceWorker<I, V, E> service;
53    /** Netty server that does that actual I/O */
54    private final NettyServer nettyServer;
55    /** Server data storage */
56    private final ServerData<I, V, E> serverData;
57    /** Mapper context */
58    private final Mapper<?, ?, ?, ?>.Context context;
59  
60    /**
61     * Constructor to start the server.
62     *
63     * @param conf Configuration
64     * @param service Service to get partition mappings
65     * @param context Mapper context
66     * @param exceptionHandler handle uncaught exceptions
67     */
68    public NettyWorkerServer(ImmutableClassesGiraphConfiguration<I, V, E> conf,
69        CentralizedServiceWorker<I, V, E> service,
70        Mapper<?, ?, ?, ?>.Context context,
71        Thread.UncaughtExceptionHandler exceptionHandler) {
72      this.conf = conf;
73      this.service = service;
74      this.context = context;
75  
76      serverData =
77          new ServerData<I, V, E>(service, this, conf, context);
78  
79      nettyServer = new NettyServer(conf,
80          new WorkerRequestServerHandler.Factory<I, V, E>(serverData),
81          service.getWorkerInfo(), context, exceptionHandler);
82      nettyServer.start();
83    }
84  
85    @Override
86    public InetSocketAddress getMyAddress() {
87      return nettyServer.getMyAddress();
88    }
89  
90    @Override
91    public String getLocalHostOrIp() {
92      return nettyServer.getLocalHostOrIp();
93    }
94  
95    @Override
96    public void prepareSuperstep() {
97      serverData.prepareSuperstep(); // updates the current message-store
98    }
99  
100   @Override
101   public ServerData<I, V, E> getServerData() {
102     return serverData;
103   }
104 
105   @Override
106   public void close() {
107     nettyServer.stop();
108   }
109 
110   @Override
111   public void setFlowControl(FlowControl flowControl) {
112     nettyServer.setFlowControl(flowControl);
113   }
114 
115   @Override
116   public long getBytesReceivedPerSuperstep() {
117     return nettyServer.getInByteCounter().getBytesReceivedPerSuperstep();
118   }
119 
120   @Override
121   public void resetBytesReceivedPerSuperstep() {
122     nettyServer.getInByteCounter().resetBytesReceivedPerSuperstep();
123   }
124 
125   @Override
126   public long getBytesReceived() {
127     return nettyServer.getInByteCounter().getBytesReceived();
128   }
129 }