This project has retired. For details please refer to its
Attic page.
NettyWorkerServer xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.netty;
20
21 import org.apache.giraph.bsp.CentralizedServiceWorker;
22 import org.apache.giraph.comm.ServerData;
23 import org.apache.giraph.comm.WorkerServer;
24 import org.apache.giraph.comm.flow_control.FlowControl;
25 import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
26 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
27 import org.apache.hadoop.io.Writable;
28 import org.apache.hadoop.io.WritableComparable;
29 import org.apache.hadoop.mapreduce.Mapper;
30 import org.apache.log4j.Logger;
31
32 import java.net.InetSocketAddress;
33
34
35
36
37
38
39
40
41
42 @SuppressWarnings("rawtypes")
43 public class NettyWorkerServer<I extends WritableComparable,
44 V extends Writable, E extends Writable>
45 implements WorkerServer<I, V, E> {
46
47 private static final Logger LOG =
48 Logger.getLogger(NettyWorkerServer.class);
49
50 private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
51
52 private final CentralizedServiceWorker<I, V, E> service;
53
54 private final NettyServer nettyServer;
55
56 private final ServerData<I, V, E> serverData;
57
58 private final Mapper<?, ?, ?, ?>.Context context;
59
60
61
62
63
64
65
66
67
68 public NettyWorkerServer(ImmutableClassesGiraphConfiguration<I, V, E> conf,
69 CentralizedServiceWorker<I, V, E> service,
70 Mapper<?, ?, ?, ?>.Context context,
71 Thread.UncaughtExceptionHandler exceptionHandler) {
72 this.conf = conf;
73 this.service = service;
74 this.context = context;
75
76 serverData =
77 new ServerData<I, V, E>(service, this, conf, context);
78
79 nettyServer = new NettyServer(conf,
80 new WorkerRequestServerHandler.Factory<I, V, E>(serverData),
81 service.getWorkerInfo(), context, exceptionHandler);
82 nettyServer.start();
83 }
84
85 @Override
86 public InetSocketAddress getMyAddress() {
87 return nettyServer.getMyAddress();
88 }
89
90 @Override
91 public String getLocalHostOrIp() {
92 return nettyServer.getLocalHostOrIp();
93 }
94
95 @Override
96 public void prepareSuperstep() {
97 serverData.prepareSuperstep();
98 }
99
100 @Override
101 public ServerData<I, V, E> getServerData() {
102 return serverData;
103 }
104
105 @Override
106 public void close() {
107 nettyServer.stop();
108 }
109
110 @Override
111 public void setFlowControl(FlowControl flowControl) {
112 nettyServer.setFlowControl(flowControl);
113 }
114
115 @Override
116 public long getBytesReceivedPerSuperstep() {
117 return nettyServer.getInByteCounter().getBytesReceivedPerSuperstep();
118 }
119
120 @Override
121 public void resetBytesReceivedPerSuperstep() {
122 nettyServer.getInByteCounter().resetBytesReceivedPerSuperstep();
123 }
124
125 @Override
126 public long getBytesReceived() {
127 return nettyServer.getInByteCounter().getBytesReceived();
128 }
129 }