This project has retired. For details please refer to its Attic page.
NettyWorkerClient xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.netty;
20  
21  import org.apache.giraph.bsp.CentralizedServiceWorker;
22  import org.apache.giraph.comm.WorkerClient;
23  import org.apache.giraph.comm.flow_control.FlowControl;
24  import org.apache.giraph.comm.requests.RequestType;
25  import org.apache.giraph.comm.requests.WritableRequest;
26  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
27  import org.apache.giraph.graph.TaskInfo;
28  import org.apache.giraph.metrics.GiraphMetrics;
29  import org.apache.giraph.metrics.MetricNames;
30  import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
31  import org.apache.giraph.metrics.SuperstepMetricsRegistry;
32  import org.apache.giraph.partition.PartitionOwner;
33  import org.apache.giraph.worker.WorkerInfo;
34  import org.apache.hadoop.io.Writable;
35  import org.apache.hadoop.io.WritableComparable;
36  import org.apache.hadoop.mapreduce.Mapper;
37  import org.apache.log4j.Logger;
38  
39  import com.google.common.collect.Lists;
40  import com.google.common.collect.Maps;
41  import com.yammer.metrics.core.Counter;
42  
43  import java.io.IOException;
44  import java.util.List;
45  import java.util.Map;
46  
47  /**
48   * Takes users facing APIs in {@link WorkerClient} and implements them
49   * using the available {@link WritableRequest} objects.
50   *
51   * @param <I> Vertex id
52   * @param <V> Vertex data
53   * @param <E> Edge data
54   */
55  @SuppressWarnings("rawtypes")
56  public class NettyWorkerClient<I extends WritableComparable,
57      V extends Writable, E extends Writable> implements
58      WorkerClient<I, V, E>, ResetSuperstepMetricsObserver {
59    /** Class logger */
60    private static final Logger LOG = Logger.getLogger(NettyWorkerClient.class);
61    /** Hadoop configuration */
62    private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
63    /** Netty client that does that actual I/O */
64    private final NettyClient nettyClient;
65    /** Centralized service, needed to get vertex ranges */
66    private final CentralizedServiceWorker<I, V, E> service;
67  
68    // Metrics
69    /** Per-superstep, per-request counters */
70    private final Map<RequestType, Counter> superstepRequestCounters;
71  
72    /**
73     * Only constructor.
74     *
75     * @param context Context from mapper
76     * @param configuration Configuration
77     * @param service Used to get partition mapping
78     * @param exceptionHandler handler for uncaught exception. Will
79     *                         terminate job.
80     */
81    public NettyWorkerClient(
82        Mapper<?, ?, ?, ?>.Context context,
83        ImmutableClassesGiraphConfiguration<I, V, E> configuration,
84        CentralizedServiceWorker<I, V, E> service,
85        Thread.UncaughtExceptionHandler exceptionHandler) {
86      this.nettyClient =
87          new NettyClient(context, configuration, service.getWorkerInfo(),
88              exceptionHandler);
89      this.conf = configuration;
90      this.service = service;
91      this.superstepRequestCounters = Maps.newHashMap();
92      GiraphMetrics.get().addSuperstepResetObserver(this);
93    }
94  
95    @Override
96    public void newSuperstep(SuperstepMetricsRegistry metrics) {
97      superstepRequestCounters.clear();
98      superstepRequestCounters.put(RequestType.SEND_WORKER_VERTICES_REQUEST,
99          metrics.getCounter(MetricNames.SEND_VERTEX_REQUESTS));
100     superstepRequestCounters.put(RequestType.SEND_WORKER_MESSAGES_REQUEST,
101         metrics.getCounter(MetricNames.SEND_WORKER_MESSAGES_REQUESTS));
102     superstepRequestCounters.put(
103         RequestType.SEND_PARTITION_CURRENT_MESSAGES_REQUEST,
104         metrics.getCounter(
105             MetricNames.SEND_PARTITION_CURRENT_MESSAGES_REQUESTS));
106     superstepRequestCounters.put(RequestType.SEND_PARTITION_MUTATIONS_REQUEST,
107         metrics.getCounter(MetricNames.SEND_PARTITION_MUTATIONS_REQUESTS));
108     superstepRequestCounters.put(RequestType.SEND_WORKER_AGGREGATORS_REQUEST,
109         metrics.getCounter(MetricNames.SEND_WORKER_AGGREGATORS_REQUESTS));
110     superstepRequestCounters.put(RequestType.SEND_AGGREGATORS_TO_MASTER_REQUEST,
111         metrics.getCounter(MetricNames.SEND_AGGREGATORS_TO_MASTER_REQUESTS));
112     superstepRequestCounters.put(RequestType.SEND_AGGREGATORS_TO_OWNER_REQUEST,
113         metrics.getCounter(MetricNames.SEND_AGGREGATORS_TO_OWNER_REQUESTS));
114     superstepRequestCounters.put(RequestType.SEND_AGGREGATORS_TO_WORKER_REQUEST,
115         metrics.getCounter(MetricNames.SEND_AGGREGATORS_TO_WORKER_REQUESTS));
116   }
117 
118   public CentralizedServiceWorker<I, V, E> getService() {
119     return service;
120   }
121 
122   @Override
123   public void openConnections() {
124     List<TaskInfo> addresses = Lists.newArrayListWithCapacity(
125         service.getWorkerInfoList().size());
126     for (WorkerInfo info : service.getWorkerInfoList()) {
127       // No need to connect to myself
128       if (service.getWorkerInfo().getTaskId() != info.getTaskId()) {
129         addresses.add(info);
130       }
131     }
132     addresses.add(service.getMasterInfo());
133     nettyClient.connectAllAddresses(addresses);
134   }
135 
136   @Override
137   public PartitionOwner getVertexPartitionOwner(I vertexId) {
138     return service.getVertexPartitionOwner(vertexId);
139   }
140 
141   @Override
142   public void sendWritableRequest(int destTaskId,
143                                   WritableRequest request) {
144     Counter counter = superstepRequestCounters.get(request.getType());
145     if (counter != null) {
146       counter.inc();
147     }
148     nettyClient.sendWritableRequest(destTaskId, request);
149   }
150 
151   @Override
152   public void waitAllRequests() {
153     nettyClient.waitAllRequests();
154   }
155 
156   @Override
157   public void closeConnections() throws IOException {
158     nettyClient.stop();
159   }
160 
161 /*if[HADOOP_NON_SECURE]
162   @Override
163   public void setup() {
164     openConnections();
165   }
166 else[HADOOP_NON_SECURE]*/
167   @Override
168   public void setup(boolean authenticate) {
169     openConnections();
170     if (authenticate) {
171       authenticate();
172     }
173   }
174 /*end[HADOOP_NON_SECURE]*/
175 
176 /*if[HADOOP_NON_SECURE]
177 else[HADOOP_NON_SECURE]*/
178   @Override
179   public void authenticate() {
180     nettyClient.authenticate();
181   }
182 
183 /*end[HADOOP_NON_SECURE]*/
184 
185   @Override
186   public FlowControl getFlowControl() {
187     return nettyClient.getFlowControl();
188   }
189 }