View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.netty;
20  
21  import java.net.InetSocketAddress;
22  import java.util.List;
23  import com.google.common.collect.Lists;
24  
25  import io.netty.channel.Channel;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelFutureListener;
28  
29  
30  /**
31   * Maintains multiple channels and rotates between them.  This is thread-safe.
32   */
33  public class ChannelRotater {
34    /** Index of last used channel */
35    private int index = 0;
36    /** Channel list */
37    private final List<Channel> channelList = Lists.newArrayList();
38    /** Task id of this channel */
39    private final Integer taskId;
40    /** Address these channels are associated with */
41    private final InetSocketAddress address;
42  
43    /**
44     * Constructor
45     *
46     * @param taskId Id of the task these channels as associated with
47     * @param address Address these channels are associated with
48     */
49    public ChannelRotater(Integer taskId, InetSocketAddress address) {
50      this.taskId = taskId;
51      this.address = address;
52    }
53  
54    public Integer getTaskId() {
55      return taskId;
56    }
57  
58    /**
59     * Add a channel to the rotation
60     *
61     * @param channel Channel to add
62     */
63    public synchronized void addChannel(Channel channel) {
64      synchronized (channelList) {
65        channelList.add(channel);
66      }
67    }
68  
69    /**
70     * Get the next channel
71     *
72     * @return Next channel
73     */
74    public synchronized Channel nextChannel() {
75      if (channelList.isEmpty()) {
76        throw new IllegalArgumentException(
77            "nextChannel: No channels exist for hostname " +
78                address.getHostName());
79      }
80  
81      ++index;
82      if (index >= channelList.size()) {
83        index = 0;
84      }
85      return channelList.get(index);
86    }
87  
88    /**
89     * Remove the a channel
90     *
91     * @param channel Channel to remove
92     * @return Return true if successful, false otherwise
93     */
94    public synchronized boolean removeChannel(Channel channel) {
95      boolean success = channelList.remove(channel);
96      if (index >= channelList.size()) {
97        index = 0;
98      }
99      return success;
100   }
101 
102   /**
103    * Get the number of channels in this object
104    *
105    * @return Number of channels
106    */
107   public synchronized int size() {
108     return channelList.size();
109   }
110 
111   /**
112    * Close the channels
113    *
114    * @param channelFutureListener If desired, pass in a channel future listener
115    */
116   public synchronized void closeChannels(
117       ChannelFutureListener channelFutureListener) {
118     for (Channel channel : channelList) {
119       ChannelFuture channelFuture = channel.close();
120       if (channelFutureListener != null) {
121         channelFuture.addListener(channelFutureListener);
122       }
123     }
124   }
125 
126   /**
127    * Get a copy of the channels
128    *
129    * @return Copy of the channels
130    */
131   public synchronized Iterable<Channel> getChannels() {
132     return Lists.newArrayList(channelList);
133   }
134 }