View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.giraph.comm.netty;
20  
21  import java.util.List;
22  import com.google.common.collect.Lists;
23  
24  import io.netty.channel.Channel;
25  import io.netty.channel.ChannelFuture;
26  import io.netty.channel.ChannelFutureListener;
27  
28  
29  /**
30   * Maintains multiple channels and rotates between them.  This is thread-safe.
31   */
32  public class ChannelRotater {
33    /** Index of last used channel */
34    private int index = 0;
35    /** Channel list */
36    private final List<Channel> channelList = Lists.newArrayList();
37    /** Task id of this channel */
38    private final Integer taskId;
39  
40    /**
41     * Constructor
42     *
43     * @param taskId Id of the task these channels as associated with
44     */
45    public ChannelRotater(Integer taskId) {
46      this.taskId = taskId;
47    }
48  
49    public Integer getTaskId() {
50      return taskId;
51    }
52  
53    /**
54     * Add a channel to the rotation
55     *
56     * @param channel Channel to add
57     */
58    public synchronized void addChannel(Channel channel) {
59      synchronized (channelList) {
60        channelList.add(channel);
61      }
62    }
63  
64    /**
65     * Get the next channel
66     *
67     * @return Next channel
68     */
69    public synchronized Channel nextChannel() {
70      if (channelList.isEmpty()) {
71        throw new IllegalArgumentException("nextChannel: No channels exist!");
72      }
73  
74      ++index;
75      if (index >= channelList.size()) {
76        index = 0;
77      }
78      return channelList.get(index);
79    }
80  
81    /**
82     * Remove the a channel
83     *
84     * @param channel Channel to remove
85     * @return Return true if successful, false otherwise
86     */
87    public synchronized boolean removeChannel(Channel channel) {
88      boolean success = channelList.remove(channel);
89      if (index >= channelList.size()) {
90        index = 0;
91      }
92      return success;
93    }
94  
95    /**
96     * Get the number of channels in this object
97     *
98     * @return Number of channels
99     */
100   public synchronized int size() {
101     return channelList.size();
102   }
103 
104   /**
105    * Close the channels
106    *
107    * @param channelFutureListener If desired, pass in a channel future listener
108    */
109   public synchronized void closeChannels(
110       ChannelFutureListener channelFutureListener) {
111     for (Channel channel : channelList) {
112       ChannelFuture channelFuture = channel.close();
113       if (channelFutureListener != null) {
114         channelFuture.addListener(channelFutureListener);
115       }
116     }
117   }
118 
119   /**
120    * Get a copy of the channels
121    *
122    * @return Copy of the channels
123    */
124   public synchronized Iterable<Channel> getChannels() {
125     return Lists.newArrayList(channelList);
126   }
127 }