This project has retired. For details please refer to its
Attic page.
NettyWorkerClient xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.netty;
20
21 import org.apache.giraph.bsp.CentralizedServiceWorker;
22 import org.apache.giraph.comm.WorkerClient;
23 import org.apache.giraph.comm.flow_control.FlowControl;
24 import org.apache.giraph.comm.requests.RequestType;
25 import org.apache.giraph.comm.requests.WritableRequest;
26 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
27 import org.apache.giraph.graph.TaskInfo;
28 import org.apache.giraph.metrics.GiraphMetrics;
29 import org.apache.giraph.metrics.MetricNames;
30 import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
31 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
32 import org.apache.giraph.partition.PartitionOwner;
33 import org.apache.giraph.worker.WorkerInfo;
34 import org.apache.hadoop.io.Writable;
35 import org.apache.hadoop.io.WritableComparable;
36 import org.apache.hadoop.mapreduce.Mapper;
37 import org.apache.log4j.Logger;
38
39 import com.google.common.collect.Lists;
40 import com.google.common.collect.Maps;
41 import com.yammer.metrics.core.Counter;
42
43 import java.io.IOException;
44 import java.util.List;
45 import java.util.Map;
46
47
48
49
50
51
52
53
54
55 @SuppressWarnings("rawtypes")
56 public class NettyWorkerClient<I extends WritableComparable,
57 V extends Writable, E extends Writable> implements
58 WorkerClient<I, V, E>, ResetSuperstepMetricsObserver {
59
60 private static final Logger LOG = Logger.getLogger(NettyWorkerClient.class);
61
62 private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
63
64 private final NettyClient nettyClient;
65
66 private final CentralizedServiceWorker<I, V, E> service;
67
68
69
70 private final Map<RequestType, Counter> superstepRequestCounters;
71
72
73
74
75
76
77
78
79
80
81 public NettyWorkerClient(
82 Mapper<?, ?, ?, ?>.Context context,
83 ImmutableClassesGiraphConfiguration<I, V, E> configuration,
84 CentralizedServiceWorker<I, V, E> service,
85 Thread.UncaughtExceptionHandler exceptionHandler) {
86 this.nettyClient =
87 new NettyClient(context, configuration, service.getWorkerInfo(),
88 exceptionHandler);
89 this.conf = configuration;
90 this.service = service;
91 this.superstepRequestCounters = Maps.newHashMap();
92 GiraphMetrics.get().addSuperstepResetObserver(this);
93 }
94
95 @Override
96 public void newSuperstep(SuperstepMetricsRegistry metrics) {
97 superstepRequestCounters.clear();
98 superstepRequestCounters.put(RequestType.SEND_WORKER_VERTICES_REQUEST,
99 metrics.getCounter(MetricNames.SEND_VERTEX_REQUESTS));
100 superstepRequestCounters.put(RequestType.SEND_WORKER_MESSAGES_REQUEST,
101 metrics.getCounter(MetricNames.SEND_WORKER_MESSAGES_REQUESTS));
102 superstepRequestCounters.put(
103 RequestType.SEND_PARTITION_CURRENT_MESSAGES_REQUEST,
104 metrics.getCounter(
105 MetricNames.SEND_PARTITION_CURRENT_MESSAGES_REQUESTS));
106 superstepRequestCounters.put(RequestType.SEND_PARTITION_MUTATIONS_REQUEST,
107 metrics.getCounter(MetricNames.SEND_PARTITION_MUTATIONS_REQUESTS));
108 superstepRequestCounters.put(RequestType.SEND_WORKER_AGGREGATORS_REQUEST,
109 metrics.getCounter(MetricNames.SEND_WORKER_AGGREGATORS_REQUESTS));
110 superstepRequestCounters.put(RequestType.SEND_AGGREGATORS_TO_MASTER_REQUEST,
111 metrics.getCounter(MetricNames.SEND_AGGREGATORS_TO_MASTER_REQUESTS));
112 superstepRequestCounters.put(RequestType.SEND_AGGREGATORS_TO_OWNER_REQUEST,
113 metrics.getCounter(MetricNames.SEND_AGGREGATORS_TO_OWNER_REQUESTS));
114 superstepRequestCounters.put(RequestType.SEND_AGGREGATORS_TO_WORKER_REQUEST,
115 metrics.getCounter(MetricNames.SEND_AGGREGATORS_TO_WORKER_REQUESTS));
116 }
117
118 public CentralizedServiceWorker<I, V, E> getService() {
119 return service;
120 }
121
122 @Override
123 public void openConnections() {
124 List<TaskInfo> addresses = Lists.newArrayListWithCapacity(
125 service.getWorkerInfoList().size());
126 for (WorkerInfo info : service.getWorkerInfoList()) {
127
128 if (service.getWorkerInfo().getTaskId() != info.getTaskId()) {
129 addresses.add(info);
130 }
131 }
132 addresses.add(service.getMasterInfo());
133 nettyClient.connectAllAddresses(addresses);
134 }
135
136 @Override
137 public PartitionOwner getVertexPartitionOwner(I vertexId) {
138 return service.getVertexPartitionOwner(vertexId);
139 }
140
141 @Override
142 public void sendWritableRequest(int destTaskId,
143 WritableRequest request) {
144 Counter counter = superstepRequestCounters.get(request.getType());
145 if (counter != null) {
146 counter.inc();
147 }
148 nettyClient.sendWritableRequest(destTaskId, request);
149 }
150
151 @Override
152 public void waitAllRequests() {
153 nettyClient.waitAllRequests();
154 }
155
156 @Override
157 public void closeConnections() throws IOException {
158 nettyClient.stop();
159 }
160
161
162
163
164
165
166
167 @Override
168 public void setup(boolean authenticate) {
169 openConnections();
170 if (authenticate) {
171 authenticate();
172 }
173 }
174
175
176
177
178 @Override
179 public void authenticate() {
180 nettyClient.authenticate();
181 }
182
183
184
185 @Override
186 public FlowControl getFlowControl() {
187 return nettyClient.getFlowControl();
188 }
189 }