001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.transport.peer;
020 import javax.jms.IllegalStateException;
021 import javax.jms.JMSException;
022 import org.apache.commons.logging.Log;
023 import org.apache.commons.logging.LogFactory;
024 import org.activemq.broker.BrokerConnector;
025 import org.activemq.broker.BrokerContainer;
026 import org.activemq.broker.impl.BrokerConnectorImpl;
027 import org.activemq.broker.impl.BrokerContainerImpl;
028 import org.activemq.io.WireFormat;
029 import org.activemq.store.vm.VMPersistenceAdapter;
030 import org.activemq.transport.DiscoveryNetworkConnector;
031 import org.activemq.transport.NetworkConnector;
032 import org.activemq.transport.TransportChannel;
033 import org.activemq.transport.multicast.MulticastDiscoveryAgent;
034 import org.activemq.transport.vm.VmTransportChannel;
035 import org.activemq.util.IdGenerator;
036 import org.activemq.util.URIHelper;
037
038 /**
039 * A <CODE>PeerTransportChannel</CODE> creates an embedded broker and networks peers together to form a P-2-P network.
040 * <P>
041 * By default, <CODE>PeerTransportChannel</CODE> uses discovery to locate other peers, and uses a well known service
042 * name on the discovery
043 * <P>
044 * An example of the expected format is: <CODE>peer://development.net</CODE> where development.net is the service name
045 * used in discovery
046 * <P>
047 *
048 * @version $Revision: 1.1.1.1 $
049 */
050 public class PeerTransportChannel extends VmTransportChannel {
051 private static final Log log = LogFactory.getLog(PeerTransportChannel.class);
052 protected static final String DEFAULT_BROKER_CONNECTOR_URI = "tcp://localhost:0";
053 protected WireFormat wireFormat;
054 protected TransportChannel channel;
055 protected String discoveryURI;
056 protected String remoteUserName;
057 protected String remotePassword;
058 protected String brokerName;
059 protected boolean doDiscovery;
060 protected String peerURIs;
061 protected String brokerConnectorURI;
062 protected String serviceName;
063 protected BrokerConnector brokerConnector;
064 protected boolean remote;
065 protected boolean persistent=true;
066
067
068 /**
069 * Construct a PeerTransportChannel
070 *
071 * @param wireFormat
072 * @param serviceName
073 * @throws JMSException
074 */
075 protected PeerTransportChannel(WireFormat wireFormat, String serviceName) throws JMSException {
076 this.wireFormat = wireFormat;
077 this.serviceName = serviceName;
078 this.discoveryURI = MulticastDiscoveryAgent.DEFAULT_DISCOVERY_URI;
079 IdGenerator idGen = new IdGenerator();
080 this.brokerName = idGen.generateId();
081 this.brokerConnectorURI = DEFAULT_BROKER_CONNECTOR_URI;
082 this.doDiscovery = true;
083 if (serviceName == null || serviceName.length() == 0) {
084 throw new IllegalStateException("No service name specified for peer:// protocol");
085 }
086 }
087
088 /**
089 * @return true if the transport channel is active, this value will be false through reconnecting
090 */
091 public boolean isTransportConnected() {
092 return true;
093 }
094
095 /**
096 * Some transports rely on an embedded broker (beer based protocols)
097 *
098 * @return true if an embedded broker required
099 */
100 public boolean requiresEmbeddedBroker() {
101 return true;
102 }
103
104 /**
105 * Some transports that rely on an embedded broker need to create the connector used by the broker
106 *
107 * @return the BrokerConnector or null if not applicable
108 * @throws JMSException
109 */
110 public BrokerConnector getEmbeddedBrokerConnector() throws JMSException {
111 try {
112 if (brokerConnector == null) {
113 BrokerContainer container = new BrokerContainerImpl(brokerName, serviceName);
114 if( !persistent ) {
115 container.setPersistenceAdapter(new VMPersistenceAdapter());
116 }
117 NetworkConnector networkConnector = null;
118 if (doDiscovery) {
119 networkConnector = new DiscoveryNetworkConnector(container);
120 MulticastDiscoveryAgent agent = new MulticastDiscoveryAgent(serviceName);
121 container.setDiscoveryAgent(agent);
122 }
123 if (peerURIs != null && peerURIs.length() > 0) {
124 URIHelper peers = new URIHelper(peerURIs);
125 networkConnector = createNetworkConnector(container);
126 while (peers.hasNext()) {
127 String peerURL = peers.getNext();
128 networkConnector.addNetworkChannel(peerURL);
129 }
130 }
131 container.addNetworkConnector(networkConnector);
132 URIHelper helper = new URIHelper(brokerConnectorURI);
133 brokerConnector = new BrokerConnectorImpl(container, helper.getNext(), wireFormat);
134 while (helper.hasNext()) {
135 new BrokerConnectorImpl(container, helper.getNext(), wireFormat);
136 }
137 container.start();
138 }
139 return brokerConnector;
140 }
141 catch (Exception e) {
142 e.printStackTrace();
143 String errorStr = "Failed to get embedded connector";
144 log.error(errorStr, e);
145 JMSException jmsEx = new JMSException(errorStr);
146 jmsEx.setLinkedException(e);
147 throw jmsEx;
148 }
149 }
150
151 /**
152 * Create a NetworkConnector
153 * @param container
154 * @return the NetworkConnector
155 */
156 protected NetworkConnector createNetworkConnector(BrokerContainer container){
157 return new NetworkConnector(container);
158 }
159
160 /**
161 * @return Returns the brokerDiscoveryURI.
162 */
163 public String getDiscoveryURI() {
164 return discoveryURI;
165 }
166
167 /**
168 * @param discoveryURI The brokerDiscoveryURI to set.
169 */
170 public void setDiscoveryURI(String discoveryURI) {
171 this.discoveryURI = discoveryURI;
172 }
173
174 /**
175 * @return Returns the brokerName.
176 */
177 public String getBrokerName() {
178 return brokerName;
179 }
180
181 /**
182 * @param brokerName The brokerName to set.
183 */
184 public void setBrokerName(String brokerName) {
185 this.brokerName = brokerName;
186 }
187
188 /**
189 * @return Returns the doDiscovery.
190 */
191 public boolean isDoDiscovery() {
192 return doDiscovery;
193 }
194
195 /**
196 * @param doDiscovery The doDiscovery to set.
197 */
198 public void setDoDiscovery(boolean doDiscovery) {
199 this.doDiscovery = doDiscovery;
200 }
201
202 /**
203 * @return Returns the wireFormat.
204 */
205 public WireFormat getWireFormat() {
206 return wireFormat;
207 }
208
209 /**
210 * @param wireFormat The wireFormat to set.
211 */
212 public void setWireFormat(WireFormat wireFormat) {
213 this.wireFormat = wireFormat;
214 }
215
216 /**
217 * @return Returns the remotePassword.
218 */
219 public String getRemotePassword() {
220 return remotePassword;
221 }
222
223 /**
224 * @param remotePassword The remotePassword to set.
225 */
226 public void setRemotePassword(String remotePassword) {
227 this.remotePassword = remotePassword;
228 }
229
230 /**
231 * @return Returns the remoteUserName.
232 */
233 public String getRemoteUserName() {
234 return remoteUserName;
235 }
236
237 /**
238 * @param remoteUserName The remoteUserName to set.
239 */
240 public void setRemoteUserName(String remoteUserName) {
241 this.remoteUserName = remoteUserName;
242 }
243
244 /**
245 * @return Returns the brokerConnectorURI.
246 */
247 public String getBrokerConnectorURI() {
248 return brokerConnectorURI;
249 }
250
251 /**
252 * @param brokerConnectorURI The brokerConnectorURI to set.
253 */
254 public void setBrokerConnectorURI(String brokerConnectorURI) {
255 this.brokerConnectorURI = brokerConnectorURI;
256 }
257
258 /**
259 * @return Returns the peerURIs.
260 */
261 public String getPeerURIs() {
262 return peerURIs;
263 }
264
265 /**
266 * @param peerURIs The peerURIs to set.
267 */
268 public void setPeerURIs(String peerURIs) {
269 this.peerURIs = peerURIs;
270 }
271
272 /**
273 * @return Returns the serviceName.
274 */
275 public String getServiceName() {
276 return serviceName;
277 }
278
279 /**
280 * @param serviceName The serviceName to set.
281 */
282 public void setServiceName(String serviceName) {
283 this.serviceName = serviceName;
284 }
285
286 /**
287 * @return Returns the remote.
288 */
289 public boolean isRemote() {
290 return remote;
291 }
292 /**
293 * @param remote The remote to set.
294 */
295 public void setRemote(boolean remote) {
296 this.remote = remote;
297 }
298
299 /**
300 * @return Returns the persistent.
301 */
302 public boolean isPersistent() {
303 return persistent;
304 }
305 /**
306 * @param persistent The persistent to set.
307 */
308 public void setPersistent(boolean persistent) {
309 this.persistent = persistent;
310 }
311 }