001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.transport;
019
020 import java.util.ArrayList;
021 import java.util.HashMap;
022 import java.util.Iterator;
023 import java.util.List;
024 import java.util.Map;
025
026 import javax.jms.JMSException;
027
028 import org.activemq.ActiveMQPrefetchPolicy;
029 import org.activemq.broker.BrokerContainer;
030 import org.activemq.service.Service;
031 import org.apache.commons.logging.Log;
032 import org.apache.commons.logging.LogFactory;
033
034 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
035
036 /**
037 * Represents a connector to one or more remote brokers.
038 * This class manages a number of {@link NetworkChannel} instances
039 * which may or may not be connected to a
040 * remote broker at any point in time.
041 * <p/>
042 * The implementation of this class could use a fixed number of
043 * configured {@link NetworkChannel} instances or could use
044 * discovery to find them.
045 *
046 * @version $Revision: 1.1.1.1 $
047 */
048 public class NetworkConnector implements Service {
049 private static final Log log = LogFactory.getLog(NetworkConnector.class);
050
051 private BrokerContainer brokerContainer;
052 private List networkChannels = new ArrayList();
053 private Map localDetails = new HashMap();
054 private String remoteUserName;
055 private String remotePassword;
056 protected PooledExecutor threadPool;
057 private ActiveMQPrefetchPolicy localPrefetchPolicy = new ActiveMQPrefetchPolicy();
058 private ActiveMQPrefetchPolicy remotePrefetchPolicy = new ActiveMQPrefetchPolicy();
059
060
061 public NetworkConnector(BrokerContainer brokerContainer) {
062 this.brokerContainer = brokerContainer;
063 this.threadPool = new PooledExecutor();
064 }
065
066 public void start() throws JMSException {
067 for (Iterator iter = networkChannels.iterator(); iter.hasNext();) {
068 NetworkChannel networkChannel = (NetworkChannel) iter.next();
069 networkChannel.setBrokerContainer(getBrokerContainer());
070 networkChannel.setThreadPool(threadPool);
071 networkChannel.start();
072 }
073 }
074
075 public void stop() throws JMSException {
076 for (Iterator iter = networkChannels.iterator(); iter.hasNext();) {
077 NetworkChannel networkChannel = (NetworkChannel) iter.next();
078 try {
079 networkChannel.stop();
080 }
081 catch (JMSException e) {
082 log.warn("Failed to stop network channel: " + e, e);
083 }
084 }
085 }
086
087 public void setTransportChannelListener(TransportChannelListener listener) {
088 }
089
090
091
092
093 // Properties
094 //-------------------------------------------------------------------------
095
096 public BrokerContainer getBrokerContainer() {
097 return brokerContainer;
098 }
099
100
101 /**
102 * @return Returns the threadPool.
103 */
104 public PooledExecutor getThreadPool() {
105 return threadPool;
106 }
107
108
109 public List getNetworkChannels() {
110 return networkChannels;
111 }
112
113 public Map getLocalDetails() {
114 return localDetails;
115 }
116
117 public void setLocalDetails(Map localDetails) {
118 this.localDetails = localDetails;
119 }
120
121 public String getRemotePassword() {
122 return remotePassword;
123 }
124
125 public void setRemotePassword(String remotePassword) {
126 this.remotePassword = remotePassword;
127 }
128
129 public String getRemoteUserName() {
130 return remoteUserName;
131 }
132
133 public void setRemoteUserName(String remoteUserName) {
134 this.remoteUserName = remoteUserName;
135 }
136
137
138 /**
139 * Sets a list of {@link NetworkChannel} instances
140 *
141 * @param networkChannels
142 */
143 public void setNetworkChannels(List networkChannels) {
144 this.networkChannels = networkChannels;
145 }
146
147 /**
148 * Adds a new network channel for the given URI
149 *
150 * @param uri
151 * @return
152 */
153 public NetworkChannel addNetworkChannel(String uri) throws JMSException {
154 NetworkChannel networkChannel = createNetworkChannel(uri);
155 addNetworkChannel(networkChannel);
156 return networkChannel;
157 }
158
159
160 /**
161 * Adds a new network channel
162 */
163 public void addNetworkChannel(NetworkChannel networkChannel) throws JMSException {
164 configure(networkChannel);
165 networkChannels.add(networkChannel);
166 }
167
168 /**
169 * Removes a network channel
170 */
171 public void removeNetworkChannel(NetworkChannel networkChannel) {
172 networkChannels.remove(networkChannel);
173 }
174
175 public ActiveMQPrefetchPolicy getLocalPrefetchPolicy() {
176 return localPrefetchPolicy;
177 }
178
179 public void setLocalPrefetchPolicy(ActiveMQPrefetchPolicy localPrefetchPolicy) {
180 this.localPrefetchPolicy = localPrefetchPolicy;
181 }
182
183 public ActiveMQPrefetchPolicy getRemotePrefetchPolicy() {
184 return remotePrefetchPolicy;
185 }
186
187 public void setRemotePrefetchPolicy(ActiveMQPrefetchPolicy remotePrefetchPolicy) {
188 this.remotePrefetchPolicy = remotePrefetchPolicy;
189 }
190
191
192 // Expose individual prefetch properties as individual properties
193 //-------------------------------------------------------------------------
194 public int getLocalDurableTopicPrefetch() {
195 return localPrefetchPolicy.getDurableTopicPrefetch();
196 }
197
198 public void setLocalDurableTopicPrefetch(int durableTopicPrefetch) {
199 localPrefetchPolicy.setDurableTopicPrefetch(durableTopicPrefetch);
200 }
201
202 public int getLocalQueuePrefetch() {
203 return localPrefetchPolicy.getQueuePrefetch();
204 }
205
206 public void setLocalQueuePrefetch(int queuePrefetch) {
207 localPrefetchPolicy.setQueuePrefetch(queuePrefetch);
208 }
209
210 public int getLocalQueueBrowserPrefetch() {
211 return localPrefetchPolicy.getQueueBrowserPrefetch();
212 }
213
214 public void setLocalQueueBrowserPrefetch(int queueBrowserPrefetch) {
215 localPrefetchPolicy.setQueueBrowserPrefetch(queueBrowserPrefetch);
216 }
217
218 public int getLocalTopicPrefetch() {
219 return localPrefetchPolicy.getTopicPrefetch();
220 }
221
222 public void setLocalTopicPrefetch(int topicPrefetch) {
223 localPrefetchPolicy.setTopicPrefetch(topicPrefetch);
224 }
225
226
227 public int getRemoteDurableTopicPrefetch() {
228 return remotePrefetchPolicy.getDurableTopicPrefetch();
229 }
230
231 public void setRemoteDurableTopicPrefetch(int durableTopicPrefetch) {
232 remotePrefetchPolicy.setDurableTopicPrefetch(durableTopicPrefetch);
233 }
234
235 public int getRemoteQueuePrefetch() {
236 return remotePrefetchPolicy.getQueuePrefetch();
237 }
238
239 public void setRemoteQueuePrefetch(int queuePrefetch) {
240 remotePrefetchPolicy.setQueuePrefetch(queuePrefetch);
241 }
242
243 public int getRemoteQueueBrowserPrefetch() {
244 return remotePrefetchPolicy.getQueueBrowserPrefetch();
245 }
246
247 public void setRemoteQueueBrowserPrefetch(int queueBrowserPrefetch) {
248 remotePrefetchPolicy.setQueueBrowserPrefetch(queueBrowserPrefetch);
249 }
250
251 public int getRemoteTopicPrefetch() {
252 return remotePrefetchPolicy.getTopicPrefetch();
253 }
254
255 public void setRemoteTopicPrefetch(int topicPrefetch) {
256 remotePrefetchPolicy.setTopicPrefetch(topicPrefetch);
257 }
258
259
260 // Implementation methods
261 //-------------------------------------------------------------------------
262
263
264 /**
265 * Create a channel from the url
266 * @param url
267 * @return
268 */
269 protected NetworkChannel createNetworkChannel(String url) {
270 NetworkChannel answer = new NetworkChannel(this,getBrokerContainer(), url);
271 answer.setRemoteUserName(getRemoteUserName());
272 answer.setRemotePassword(getRemotePassword());
273 return answer;
274 }
275
276 /**
277 * Performs any network connector based configuration; such as setting the dispatch policies
278 *
279 * @param networkChannel
280 */
281 protected void configure(NetworkChannel networkChannel) throws JMSException {
282 networkChannel.setLocalPrefetchPolicy(localPrefetchPolicy);
283 networkChannel.setRemotePrefetchPolicy(remotePrefetchPolicy);
284 }
285
286 }