1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.giraph.comm.netty;
20
21 import java.net.InetSocketAddress;
22 import java.util.List;
23 import com.google.common.collect.Lists;
24
25 import io.netty.channel.Channel;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import org.apache.log4j.Logger;
29
30
31 /**
32 * Maintains multiple channels and rotates between them. This is thread-safe.
33 */
34 public class ChannelRotater {
35 /** Logger */
36 private static final Logger LOG = Logger.getLogger(ChannelRotater.class);
37 /** Index of last used channel */
38 private int index = 0;
39 /** Channel list */
40 private final List<Channel> channelList = Lists.newArrayList();
41 /** Task id of this channel */
42 private final Integer taskId;
43 /** Address these channels are associated with */
44 private final InetSocketAddress address;
45
46 /**
47 * Constructor
48 *
49 * @param taskId Id of the task these channels as associated with
50 * @param address Address these channels are associated with
51 */
52 public ChannelRotater(Integer taskId, InetSocketAddress address) {
53 this.taskId = taskId;
54 this.address = address;
55 }
56
57 public Integer getTaskId() {
58 return taskId;
59 }
60
61 /**
62 * Add a channel to the rotation
63 *
64 * @param channel Channel to add
65 */
66 public synchronized void addChannel(Channel channel) {
67 synchronized (channelList) {
68 channelList.add(channel);
69 }
70 }
71
72 /**
73 * Get the next channel
74 *
75 * @return Next channel
76 */
77 public synchronized Channel nextChannel() {
78 if (channelList.isEmpty()) {
79 LOG.warn("nextChannel: No channels exist for hostname " +
80 address.getHostName());
81 return null;
82 }
83
84 ++index;
85 if (index >= channelList.size()) {
86 index = 0;
87 }
88 return channelList.get(index);
89 }
90
91 /**
92 * Remove the a channel
93 *
94 * @param channel Channel to remove
95 * @return Return true if successful, false otherwise
96 */
97 public synchronized boolean removeChannel(Channel channel) {
98 boolean success = channelList.remove(channel);
99 if (index >= channelList.size()) {
100 index = 0;
101 }
102 return success;
103 }
104
105 /**
106 * Get the number of channels in this object
107 *
108 * @return Number of channels
109 */
110 public synchronized int size() {
111 return channelList.size();
112 }
113
114 /**
115 * Close the channels
116 *
117 * @param channelFutureListener If desired, pass in a channel future listener
118 */
119 public synchronized void closeChannels(
120 ChannelFutureListener channelFutureListener) {
121 for (Channel channel : channelList) {
122 ChannelFuture channelFuture = channel.close();
123 if (channelFutureListener != null) {
124 channelFuture.addListener(channelFutureListener);
125 }
126 }
127 }
128
129 /**
130 * Get a copy of the channels
131 *
132 * @return Copy of the channels
133 */
134 public synchronized Iterable<Channel> getChannels() {
135 return Lists.newArrayList(channelList);
136 }
137 }