This project has retired. For details please refer to its Attic page.
ChannelRotater xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.netty;
20  
21  import java.net.InetSocketAddress;
22  import java.util.List;
23  import com.google.common.collect.Lists;
24  
25  import io.netty.channel.Channel;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelFutureListener;
28  import org.apache.log4j.Logger;
29  
30  
31  /**
32   * Maintains multiple channels and rotates between them.  This is thread-safe.
33   */
34  public class ChannelRotater {
35    /** Logger */
36    private static final Logger LOG = Logger.getLogger(ChannelRotater.class);
37    /** Index of last used channel */
38    private int index = 0;
39    /** Channel list */
40    private final List<Channel> channelList = Lists.newArrayList();
41    /** Task id of this channel */
42    private final Integer taskId;
43    /** Address these channels are associated with */
44    private final InetSocketAddress address;
45  
46    /**
47     * Constructor
48     *
49     * @param taskId Id of the task these channels as associated with
50     * @param address Address these channels are associated with
51     */
52    public ChannelRotater(Integer taskId, InetSocketAddress address) {
53      this.taskId = taskId;
54      this.address = address;
55    }
56  
57    public Integer getTaskId() {
58      return taskId;
59    }
60  
61    /**
62     * Add a channel to the rotation
63     *
64     * @param channel Channel to add
65     */
66    public synchronized void addChannel(Channel channel) {
67      synchronized (channelList) {
68        channelList.add(channel);
69      }
70    }
71  
72    /**
73     * Get the next channel
74     *
75     * @return Next channel
76     */
77    public synchronized Channel nextChannel() {
78      if (channelList.isEmpty()) {
79        LOG.warn("nextChannel: No channels exist for hostname " +
80          address.getHostName());
81        return null;
82      }
83  
84      ++index;
85      if (index >= channelList.size()) {
86        index = 0;
87      }
88      return channelList.get(index);
89    }
90  
91    /**
92     * Remove the a channel
93     *
94     * @param channel Channel to remove
95     * @return Return true if successful, false otherwise
96     */
97    public synchronized boolean removeChannel(Channel channel) {
98      boolean success = channelList.remove(channel);
99      if (index >= channelList.size()) {
100       index = 0;
101     }
102     return success;
103   }
104 
105   /**
106    * Get the number of channels in this object
107    *
108    * @return Number of channels
109    */
110   public synchronized int size() {
111     return channelList.size();
112   }
113 
114   /**
115    * Close the channels
116    *
117    * @param channelFutureListener If desired, pass in a channel future listener
118    */
119   public synchronized void closeChannels(
120       ChannelFutureListener channelFutureListener) {
121     for (Channel channel : channelList) {
122       ChannelFuture channelFuture = channel.close();
123       if (channelFutureListener != null) {
124         channelFuture.addListener(channelFutureListener);
125       }
126     }
127   }
128 
129   /**
130    * Get a copy of the channels
131    *
132    * @return Copy of the channels
133    */
134   public synchronized Iterable<Channel> getChannels() {
135     return Lists.newArrayList(channelList);
136   }
137 }