001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.transport;
020 import java.net.URI;
021 import java.net.URISyntaxException;
022 import javax.jms.JMSException;
023 import org.apache.commons.logging.Log;
024 import org.apache.commons.logging.LogFactory;
025 import org.activemq.broker.BrokerContainer;
026 import org.activemq.broker.impl.BrokerConnectorImpl;
027 import org.activemq.io.impl.DefaultWireFormat;
028 import org.activemq.message.BrokerInfo;
029 import org.activemq.transport.composite.CompositeTransportChannel;
030 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
031
032 /**
033 * Represents a Boondocks broker's connection with a single remote broker which bridges the two brokers to form a network. <p/>
034 * The NetworkChannel contains a JMS connection with the remote broker. <p/>New subscriptions on the local broker are
035 * multiplexed into the JMS connection so that messages published on the remote broker can be replayed onto the local
036 * broker.
037 *
038 * @version $Revision: 1.1.1.1 $
039 */
040 public class RemoteNetworkChannel extends NetworkChannel implements TransportStatusEventListener {
041 private static final Log log = LogFactory.getLog(RemoteNetworkChannel.class);
042 private TransportChannel boondocksChannel;
043
044 /**
045 * Default Constructor
046 *
047 * @param tp
048 */
049 public RemoteNetworkChannel(PooledExecutor tp) {
050 super(tp);
051 }
052
053 /**
054 * Constructor
055 *
056 * @param connector
057 * @param brokerContainer
058 * @param uri
059 */
060 public RemoteNetworkChannel(NetworkConnector connector, BrokerContainer brokerContainer, String uri) {
061 super(connector,brokerContainer,uri);
062 }
063
064
065 /**
066 * @see org.activemq.transport.TransportStatusEventListener#statusChanged(org.activemq.transport.TransportStatusEvent)
067 */
068 public void statusChanged(TransportStatusEvent event) {
069 if (event.getTransportChannel() == boondocksChannel) {
070 if (event.getChannelStatus() == TransportStatusEvent.RECONNECTED) {
071 try {
072 sendBrokerInfo();
073 }
074 catch (JMSException e) {
075 log.error("Failed to send Broker Info", e);
076 }
077 }
078 }
079 else {
080 super.statusChanged(event);
081 }
082 }
083
084
085 /**
086 * remote:// can only make outgoing connections - we assume we can't
087 * accept incomming (duck!). So we initialize the transport channel
088 * from this side and create the broker client as well
089 * @throws JMSException
090 */
091
092 protected void initialize() throws JMSException {
093 super.initialize();
094 try {
095 boondocksChannel = TransportChannelProvider.create(new DefaultWireFormat(), new URI(uri));
096 boondocksChannel.addTransportStatusEventListener(this);
097 if (boondocksChannel instanceof CompositeTransportChannel) {
098 CompositeTransportChannel composite = (CompositeTransportChannel)boondocksChannel;
099 composite.setMaximumRetries(maximumRetries);
100 composite.setFailureSleepTime(reconnectSleepTime);
101 composite.setIncrementTimeout(false);
102 }
103 boondocksChannel.start();
104 //create our own broker connector ...
105 BrokerConnectorImpl connector = new BrokerConnectorImpl(getBrokerContainer(),"vm://uri",new DefaultWireFormat());
106 connector.start();
107 connector.addClient(boondocksChannel);
108 sendBrokerInfo();
109 }
110 catch (URISyntaxException e) {
111 log.error("Could not parse uri: " + uri + " to make remote connector",e);
112 }
113 }
114
115 private void sendBrokerInfo() throws JMSException{
116 //inform the other side we are a remote channel
117 if (boondocksChannel != null) {
118 BrokerInfo info = new BrokerInfo();
119 info.setBrokerName(brokerContainer.getBroker().getBrokerName());
120 info.setClusterName(brokerContainer.getBroker().getBrokerClusterName());
121 info.setRemote(true);
122 boondocksChannel.asyncSend(info);
123 }
124 }
125
126
127 }