This project has retired. For details please refer to its Attic page.
NettyMasterClient xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.netty;
20  
21  import java.io.IOException;
22  
23  import org.apache.giraph.bsp.CentralizedServiceMaster;
24  import org.apache.giraph.comm.GlobalCommType;
25  import org.apache.giraph.comm.MasterClient;
26  import org.apache.giraph.comm.aggregators.AggregatorUtils;
27  import org.apache.giraph.comm.aggregators.SendGlobalCommCache;
28  import org.apache.giraph.comm.flow_control.FlowControl;
29  import org.apache.giraph.comm.requests.SendAggregatorsToOwnerRequest;
30  import org.apache.giraph.comm.requests.WritableRequest;
31  import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
32  import org.apache.giraph.worker.WorkerInfo;
33  import org.apache.hadoop.io.Writable;
34  import org.apache.hadoop.mapreduce.Mapper;
35  import org.apache.hadoop.util.Progressable;
36  
37  /**
38   * Netty implementation of {@link MasterClient}
39   */
40  public class NettyMasterClient implements MasterClient {
41    /** Netty client that does the actual I/O */
42    private final NettyClient nettyClient;
43    /** Worker information for current superstep */
44    private final CentralizedServiceMaster<?, ?, ?> service;
45    /** Cached map of partition ids to serialized aggregator data */
46    private final SendGlobalCommCache sendGlobalCommCache =
47        new SendGlobalCommCache(true);
48    /** How big a single aggregator request can be */
49    private final int maxBytesPerAggregatorRequest;
50    /** Progressable used to report progress */
51    private final Progressable progressable;
52  
53    /**
54     * Constructor
55     *
56     * @param context Context from mapper
57     * @param configuration Configuration
58     * @param service Centralized service
59     * @param exceptionHandler handler for uncaught exception. Will
60     *                         terminate job.
61     */
62    public NettyMasterClient(Mapper<?, ?, ?, ?>.Context context,
63                             ImmutableClassesGiraphConfiguration configuration,
64                             CentralizedServiceMaster<?, ?, ?> service,
65                             Thread.UncaughtExceptionHandler exceptionHandler) {
66      this.nettyClient =
67          new NettyClient(context, configuration, service.getMasterInfo(),
68              exceptionHandler);
69      this.service = service;
70      this.progressable = context;
71      maxBytesPerAggregatorRequest = configuration.getInt(
72          AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST,
73          AggregatorUtils.MAX_BYTES_PER_AGGREGATOR_REQUEST_DEFAULT);
74    }
75  
76    @Override
77    public void openConnections() {
78      nettyClient.connectAllAddresses(service.getWorkerInfoList());
79    }
80  
81    @Override
82    public void sendToOwner(String name, GlobalCommType sendType, Writable object)
83      throws IOException {
84      WorkerInfo owner =
85          AggregatorUtils.getOwner(name, service.getWorkerInfoList());
86      int currentSize = sendGlobalCommCache.addValue(owner.getTaskId(),
87          name, sendType, object);
88      if (currentSize >= maxBytesPerAggregatorRequest) {
89        flushAggregatorsToWorker(owner);
90      }
91    }
92  
93    @Override
94    public void finishSendingValues() throws IOException {
95      for (WorkerInfo worker : service.getWorkerInfoList()) {
96        sendGlobalCommCache.addSpecialCount(worker.getTaskId());
97        flushAggregatorsToWorker(worker);
98        progressable.progress();
99      }
100     sendGlobalCommCache.reset();
101   }
102 
103   /**
104    * Send aggregators from cache to worker.
105    *
106    * @param worker Worker which we want to send aggregators to
107    */
108   private void flushAggregatorsToWorker(WorkerInfo worker) {
109     byte[] data =
110         sendGlobalCommCache.removeSerialized(worker.getTaskId());
111     nettyClient.sendWritableRequest(
112         worker.getTaskId(), new SendAggregatorsToOwnerRequest(data,
113           service.getMasterInfo().getTaskId()));
114   }
115 
116   @Override
117   public void flush() {
118     nettyClient.waitAllRequests();
119   }
120 
121   @Override
122   public void sendWritableRequest(int destTaskId, WritableRequest request) {
123     nettyClient.sendWritableRequest(destTaskId, request);
124   }
125 
126   @Override
127   public void closeConnections() {
128     nettyClient.stop();
129   }
130 
131   @Override
132   public FlowControl getFlowControl() {
133     return nettyClient.getFlowControl();
134   }
135 }