001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq;
020
021 import java.util.Iterator;
022 import java.util.Map;
023
024 import javax.jms.Connection;
025 import javax.jms.ConnectionConsumer;
026 import javax.jms.ConnectionMetaData;
027 import javax.jms.DeliveryMode;
028 import javax.jms.Destination;
029 import javax.jms.ExceptionListener;
030 import javax.jms.IllegalStateException;
031 import javax.jms.JMSException;
032 import javax.jms.Queue;
033 import javax.jms.QueueConnection;
034 import javax.jms.QueueSession;
035 import javax.jms.ServerSessionPool;
036 import javax.jms.Session;
037 import javax.jms.Topic;
038 import javax.jms.TopicConnection;
039 import javax.jms.TopicSession;
040 import javax.jms.XAConnection;
041
042 import org.activemq.advisories.TempDestinationAdvisor;
043 import org.activemq.advisories.TempDestinationAdvisoryEvent;
044 import org.activemq.capacity.CapacityMonitorEvent;
045 import org.activemq.capacity.CapacityMonitorEventListener;
046 import org.activemq.filter.AndFilter;
047 import org.activemq.filter.Filter;
048 import org.activemq.filter.FilterFactory;
049 import org.activemq.filter.FilterFactoryImpl;
050 import org.activemq.filter.NoLocalFilter;
051 import org.activemq.io.util.ByteArrayCompression;
052 import org.activemq.io.util.ByteArrayFragmentation;
053 import org.activemq.io.util.MemoryBoundedObjectManager;
054 import org.activemq.io.util.MemoryBoundedQueue;
055 import org.activemq.io.util.MemoryBoundedQueueManager;
056 import org.activemq.management.JMSConnectionStatsImpl;
057 import org.activemq.management.JMSStatsImpl;
058 import org.activemq.management.StatsCapable;
059 import org.activemq.management.StatsImpl;
060 import org.activemq.message.ActiveMQDestination;
061 import org.activemq.message.ActiveMQMessage;
062 import org.activemq.message.ActiveMQObjectMessage;
063 import org.activemq.message.BrokerAdminCommand;
064 import org.activemq.message.CapacityInfo;
065 import org.activemq.message.CleanupConnectionInfo;
066 import org.activemq.message.ConnectionInfo;
067 import org.activemq.message.ConsumerInfo;
068 import org.activemq.message.Packet;
069 import org.activemq.message.PacketListener;
070 import org.activemq.message.ProducerInfo;
071 import org.activemq.message.Receipt;
072 import org.activemq.message.ResponseReceipt;
073 import org.activemq.message.SessionInfo;
074 import org.activemq.message.TransactionInfo;
075 import org.activemq.message.WireFormatInfo;
076 import org.activemq.message.XATransactionInfo;
077 import org.activemq.transport.TransportChannel;
078 import org.activemq.transport.TransportStatusEvent;
079 import org.activemq.transport.TransportStatusEventListener;
080 import org.activemq.util.IdGenerator;
081 import org.activemq.util.JMSExceptionHelper;
082 import org.apache.commons.logging.Log;
083 import org.apache.commons.logging.LogFactory;
084
085 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
086 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
087 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
088 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
089
090 /**
091 * A <CODE>Connection</CODE> object is a client's active connection to its JMS
092 * provider. It typically allocates provider resources outside the Java virtual
093 * machine (JVM).
094 * <P>
095 * Connections support concurrent use.
096 * <P>
097 * A connection serves several purposes:
098 * <UL>
099 * <LI>It encapsulates an open connection with a JMS provider. It typically
100 * represents an open TCP/IP socket between a client and the service provider
101 * software.
102 * <LI>Its creation is where client authentication takes place.
103 * <LI>It can specify a unique client identifier.
104 * <LI>It provides a <CODE>ConnectionMetaData</CODE> object.
105 * <LI>It supports an optional <CODE>ExceptionListener</CODE> object.
106 * </UL>
107 * <P>
108 * Because the creation of a connection involves setting up authentication and
109 * communication, a connection is a relatively heavyweight object. Most clients
110 * will do all their messaging with a single connection. Other more advanced
111 * applications may use several connections. The JMS API does not architect a
112 * reason for using multiple connections; however, there may be operational
113 * reasons for doing so.
114 * <P>
115 * A JMS client typically creates a connection, one or more sessions, and a
116 * number of message producers and consumers. When a connection is created, it
117 * is in stopped mode. That means that no messages are being delivered.
118 * <P>
119 * It is typical to leave the connection in stopped mode until setup is complete
120 * (that is, until all message consumers have been created). At that point, the
121 * client calls the connection's <CODE>start</CODE> method, and messages begin
122 * arriving at the connection's consumers. This setup convention minimizes any
123 * client confusion that may result from asynchronous message delivery while the
124 * client is still in the process of setting itself up.
125 * <P>
126 * A connection can be started immediately, and the setup can be done
127 * afterwards. Clients that do this must be prepared to handle asynchronous
128 * message delivery while they are still in the process of setting up.
129 * <P>
130 * A message producer can send messages while a connection is stopped. <p/>This
131 * class is also a <CODE>TopicConnection </CODE>. A <CODE>TopicConnection</CODE>
132 * object is an active connection to a publish/subscribe JMS provider. A client
133 * uses a <CODE>TopicConnection</CODE> object to create one or more <CODE>TopicSession</CODE>
134 * objects for producing and consuming messages.
135 * <P>
136 * A <CODE>TopicConnection</CODE> can be used to create a <CODE>TopicSession</CODE>,
137 * from which specialized topic-related objects can be created. A more general,
138 * and recommended approach is to use the <CODE>Connection </CODE> object.
139 * <P>
140 * <p/><p/>This class is also a <CODE>QueueConnection</CODE>. A A <CODE>QueueConnection</CODE>
141 * object is an active connection to a point-to-point JMS provider. A client
142 * uses a <CODE>QueueConnection</CODE> object to create one or more <CODE>QueueSession</CODE>
143 * objects for producing and consuming messages.
144 * <P>
145 * A <CODE>QueueConnection</CODE> can be used to create a <CODE>QueueSession</CODE>,
146 * from which specialized queue-related objects can be created. A more general,
147 * and recommended, approach is to use the <CODE>Connection </CODE> object.
148 * <P>
149 * A <CODE>QueueConnection</CODE> cannot be used to create objects specific to
150 * the publish/subscribe domain. The <CODE>createDurableConnectionConsumer</CODE>
151 * method inherits from <CODE>Connection</CODE>, but must throw an <CODE>IllegalStateException</CODE>
152 * if used from <CODE>QueueConnection</CODE>. // *
153 *
154 * @version $Revision: 1.1.1.1 $
155 * @see javax.jms.Connection
156 * @see javax.jms.ConnectionFactory
157 * @see javax.jms.QueueConnection
158 * @see javax.jms.TopicConnection
159 * @see javax.jms.TopicConnectionFactory
160 * @see javax.jms.QueueConnection
161 * @see javax.jms.QueueConnectionFactory
162 */
163 public class ActiveMQConnection implements Connection, PacketListener,
164 ExceptionListener, TopicConnection, QueueConnection, StatsCapable,
165 CapacityMonitorEventListener, TransportStatusEventListener, Closeable {
166
167 /**
168 * Default UserName for the Connection
169 */
170 public static final String DEFAULT_USER = "defaultUser";
171
172 /**
173 * Default URL for the ActiveMQ Broker
174 */
175 public static final String DEFAULT_BROKER_URL = "tcp://localhost:61616";
176
177 /**
178 * Default client URL. If using a message broker in a hub(s)/spoke
179 * architecture - use the DEFAULT_BROKER_URL
180 *
181 * @see ActiveMQConnection#DEFAULT_BROKER_URL
182 */
183 public static final String DEFAULT_URL = "peer://development";
184
185 /**
186 * Default Password for the Connection
187 */
188 public static final String DEFAULT_PASSWORD = "defaultPassword";
189
190 private static final Log log = LogFactory.getLog(ActiveMQConnection.class);
191
192 private static final int DEFAULT_CONNECTION_MEMORY_LIMIT = 10 * 1024 * 1024;
193
194 // properties
195 private ActiveMQConnectionFactory factory;
196
197 private String userName;
198
199 private String password;
200
201 protected String clientID;
202
203 private int sendCloseTimeout = 2000;
204
205 private TransportChannel transportChannel;
206
207 private ExceptionListener exceptionListener;
208
209 private ActiveMQPrefetchPolicy prefetchPolicy;
210
211 private JMSStatsImpl factoryStats;
212
213 private MemoryBoundedObjectManager memoryManager;
214
215 private MemoryBoundedQueueManager boundedQueueManager;
216
217 protected IdGenerator handleIdGenerator;
218
219 private IdGenerator clientIdGenerator;
220
221 protected IdGenerator packetIdGenerator;
222
223 private IdGenerator sessionIdGenerator;
224
225 private JMSConnectionStatsImpl stats;
226
227 // internal state
228 private CopyOnWriteArrayList sessions;
229
230 private CopyOnWriteArrayList messageDispatchers;
231
232 private CopyOnWriteArrayList connectionConsumers;
233
234 private SynchronizedInt consumerNumberGenerator;
235
236 private ActiveMQConnectionMetaData connectionMetaData;
237
238 private boolean closed;
239
240 private SynchronizedBoolean started;
241
242 private boolean clientIDSet;
243
244 private boolean isConnectionInfoSentToBroker;
245
246 private boolean isTransportOK;
247
248 private boolean startedTransport;
249
250 private long startTime;
251
252 private long flowControlSleepTime = 0;
253
254 private boolean quickClose;
255
256 private boolean internalConnection;// used for notifying that the
257 // connection is used for networks etc.
258
259 private boolean userSpecifiedClientID;
260
261 /**
262 * Should we use an async send for persistent non transacted messages ?
263 */
264 protected boolean useAsyncSend = true;
265
266 private int sendConnectionInfoTimeout = 30000;
267
268 private boolean disableTimeStampsByDefault = false;
269
270 private boolean J2EEcompliant = true;
271
272 private boolean prepareMessageBodyOnSend = true;
273
274 private boolean copyMessageOnSend = true;
275
276 // compression and fragmentation variables
277
278 private boolean doMessageCompression = true;
279
280 private int messageCompressionLimit = ByteArrayCompression.DEFAULT_COMPRESSION_LIMIT;// data
281 // size
282 // above
283 // which
284 // compression
285 // will
286 // be
287 // used
288
289 private int messageCompressionLevel = ByteArrayCompression.DEFAULT_COMPRESSION_LEVEL;
290
291 private int messageCompressionStrategy = ByteArrayCompression.DEFAULT_COMPRESSION_STRATEGY;// default
292 // compression
293 // strategy
294
295 private boolean doMessageFragmentation = true;
296
297 private int messageFragmentationLimit = ByteArrayFragmentation.DEFAULT_FRAGMENTATION_LIMIT;
298
299 private boolean cachingEnabled = true;
300
301 private boolean optimizedMessageDispatch = false;
302
303 private CopyOnWriteArrayList transientConsumedRedeliverCache;
304
305 private FilterFactory filterFactory;
306
307 private Map tempDestinationMap;
308
309 private Map validDestinationsMap;
310
311 private String resourceManagerId;
312
313 /**
314 * A static helper method to create a new connection
315 *
316 * @return an ActiveMQConnection
317 * @throws JMSException
318 */
319 public static ActiveMQConnection makeConnection() throws JMSException {
320 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
321 return (ActiveMQConnection) factory.createConnection();
322 }
323
324 /**
325 * A static helper method to create a new connection
326 *
327 * @param uri
328 * @return and ActiveMQConnection
329 * @throws JMSException
330 */
331 public static ActiveMQConnection makeConnection(String uri)
332 throws JMSException {
333 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
334 return (ActiveMQConnection) factory.createConnection();
335 }
336
337 /**
338 * A static helper method to create a new connection
339 *
340 * @param user
341 * @param password
342 * @param uri
343 * @return an ActiveMQConnection
344 * @throws JMSException
345 */
346 public static ActiveMQConnection makeConnection(String user,
347 String password, String uri) throws JMSException {
348 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user,
349 password, uri);
350 return (ActiveMQConnection) factory.createConnection();
351 }
352
353 /**
354 * Constructs a connection from an existing TransportChannel and
355 * user/password.
356 *
357 * @param factory
358 * @param theUserName
359 * the users name
360 * @param thePassword
361 * the password
362 * @param transportChannel
363 * the transport channel to communicate with the server
364 * @throws JMSException
365 */
366 public ActiveMQConnection(ActiveMQConnectionFactory factory,
367 String theUserName, String thePassword,
368 TransportChannel transportChannel) throws JMSException {
369 this(factory, theUserName, thePassword);
370 this.transportChannel = transportChannel;
371 this.transportChannel.setPacketListener(this);
372 this.transportChannel.setExceptionListener(this);
373 this.transportChannel.addTransportStatusEventListener(this);
374 this.isTransportOK = true;
375 }
376
377 protected ActiveMQConnection(ActiveMQConnectionFactory factory,
378 String theUserName, String thePassword) {
379 this.factory = factory;
380 this.userName = theUserName;
381 this.password = thePassword;
382 this.clientIdGenerator = new IdGenerator();
383 this.packetIdGenerator = new IdGenerator();
384 this.handleIdGenerator = new IdGenerator();
385 this.sessionIdGenerator = new IdGenerator();
386 this.consumerNumberGenerator = new SynchronizedInt(0);
387 this.sessions = new CopyOnWriteArrayList();
388 this.messageDispatchers = new CopyOnWriteArrayList();
389 this.connectionConsumers = new CopyOnWriteArrayList();
390 this.connectionMetaData = new ActiveMQConnectionMetaData();
391 this.started = new SynchronizedBoolean(false);
392 this.startTime = System.currentTimeMillis();
393 this.prefetchPolicy = new ActiveMQPrefetchPolicy();
394 this.memoryManager = new MemoryBoundedObjectManager(clientID,
395 DEFAULT_CONNECTION_MEMORY_LIMIT);
396 this.boundedQueueManager = new MemoryBoundedQueueManager(memoryManager);
397 this.memoryManager.addCapacityEventListener(this);
398 boolean transactional = this instanceof XAConnection;
399 factoryStats = factory.getFactoryStats();
400 factoryStats.addConnection(this);
401 stats = new JMSConnectionStatsImpl(sessions, transactional);
402 this.transientConsumedRedeliverCache = new CopyOnWriteArrayList();
403 this.tempDestinationMap = new ConcurrentHashMap();
404 this.validDestinationsMap = new ConcurrentHashMap();
405 factory.onConnectionCreate(this);
406 }
407
408 /**
409 * @return statistics for this Connection
410 */
411 public StatsImpl getStats() {
412 return stats;
413 }
414
415 /**
416 * @return a number unique for this connection
417 */
418 public JMSConnectionStatsImpl getConnectionStats() {
419 return stats;
420 }
421
422 /**
423 * Creates a <CODE>Session</CODE> object.
424 *
425 * @param transacted
426 * indicates whether the session is transacted
427 * @param acknowledgeMode
428 * indicates whether the consumer or the client will acknowledge
429 * any messages it receives; ignored if the session is
430 * transacted. Legal values are
431 * <code>Session.AUTO_ACKNOWLEDGE</code>,
432 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
433 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
434 * @return a newly created session
435 * @throws JMSException
436 * if the <CODE>Connection</CODE> object fails to create a
437 * session due to some internal error or lack of support for the
438 * specific transaction and acknowledgement mode.
439 * @see Session#AUTO_ACKNOWLEDGE
440 * @see Session#CLIENT_ACKNOWLEDGE
441 * @see Session#DUPS_OK_ACKNOWLEDGE
442 * @since 1.1
443 */
444 public Session createSession(boolean transacted, int acknowledgeMode)
445 throws JMSException {
446 checkClosed();
447 sendConnectionInfoToBroker();
448 return new ActiveMQSession(
449 this,
450 (transacted ? Session.SESSION_TRANSACTED
451 : (acknowledgeMode == Session.SESSION_TRANSACTED ? Session.AUTO_ACKNOWLEDGE
452 : acknowledgeMode)));
453 }
454
455 /**
456 * Creates a <CODE>Session</CODE> object.
457 *
458 * @param transacted
459 * indicates whether the session is transacted
460 * @param acknowledgeMode
461 * indicates whether the consumer or the client will acknowledge
462 * any messages it receives; ignored if the session is
463 * transacted. Legal values are
464 * <code>Session.AUTO_ACKNOWLEDGE</code>,
465 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
466 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
467 * @param optimizedDispatch
468 * @return a newly created session
469 * @throws JMSException
470 * if the <CODE>Connection</CODE> object fails to create a
471 * session due to some internal error or lack of support for the
472 * specific transaction and acknowledgement mode.
473 * @see Session#AUTO_ACKNOWLEDGE
474 * @see Session#CLIENT_ACKNOWLEDGE
475 * @see Session#DUPS_OK_ACKNOWLEDGE
476 * @since 1.1
477 */
478 public Session createSession(boolean transacted, int acknowledgeMode,
479 boolean optimizedDispatch) throws JMSException {
480 checkClosed();
481 sendConnectionInfoToBroker();
482 return new ActiveMQSession(this,
483 (transacted ? Session.SESSION_TRANSACTED : acknowledgeMode),
484 optimizedDispatch);
485 }
486
487 /**
488 * Gets the client identifier for this connection.
489 * <P>
490 * This value is specific to the JMS provider. It is either preconfigured by
491 * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
492 * dynamically by the application by calling the <code>setClientID</code>
493 * method.
494 *
495 * @return the unique client identifier
496 * @throws JMSException
497 * if the JMS provider fails to return the client ID for this
498 * connection due to some internal error.
499 */
500 public String getClientID() throws JMSException {
501 checkClosed();
502 return this.clientID;
503 }
504
505 /**
506 * Sets the client identifier for this connection.
507 * <P>
508 * The preferred way to assign a JMS client's client identifier is for it to
509 * be configured in a client-specific <CODE>ConnectionFactory</CODE>
510 * object and transparently assigned to the <CODE>Connection</CODE> object
511 * it creates.
512 * <P>
513 * Alternatively, a client can set a connection's client identifier using a
514 * provider-specific value. The facility to set a connection's client
515 * identifier explicitly is not a mechanism for overriding the identifier
516 * that has been administratively configured. It is provided for the case
517 * where no administratively specified identifier exists. If one does exist,
518 * an attempt to change it by setting it must throw an <CODE>IllegalStateException</CODE>.
519 * If a client sets the client identifier explicitly, it must do so
520 * immediately after it creates the connection and before any other action
521 * on the connection is taken. After this point, setting the client
522 * identifier is a programming error that should throw an <CODE>IllegalStateException</CODE>.
523 * <P>
524 * The purpose of the client identifier is to associate a connection and its
525 * objects with a state maintained on behalf of the client by a provider.
526 * The only such state identified by the JMS API is that required to support
527 * durable subscriptions.
528 * <P>
529 * If another connection with the same <code>clientID</code> is already
530 * running when this method is called, the JMS provider should detect the
531 * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
532 *
533 * @param newClientID
534 * the unique client identifier
535 * @throws JMSException
536 * if the JMS provider fails to set the client ID for this
537 * connection due to some internal error.
538 * @throws javax.jms.InvalidClientIDException
539 * if the JMS client specifies an invalid or duplicate client
540 * ID.
541 * @throws javax.jms.IllegalStateException
542 * if the JMS client attempts to set a connection's client ID at
543 * the wrong time or when it has been administratively
544 * configured.
545 */
546 public void setClientID(String newClientID) throws JMSException {
547 if (this.clientIDSet) {
548 throw new IllegalStateException("The clientID has already been set");
549 }
550 if (this.isConnectionInfoSentToBroker) {
551 throw new IllegalStateException(
552 "Setting clientID on a used Connection is not allowed");
553 }
554 checkClosed();
555 this.clientID = newClientID;
556 this.userSpecifiedClientID = true;
557 ensureClientIDInitialised();
558 }
559
560 /**
561 * Gets the metadata for this connection.
562 *
563 * @return the connection metadata
564 * @throws JMSException
565 * if the JMS provider fails to get the connection metadata for
566 * this connection.
567 * @see javax.jms.ConnectionMetaData
568 */
569 public ConnectionMetaData getMetaData() throws JMSException {
570 checkClosed();
571 return this.connectionMetaData;
572 }
573
574 /**
575 * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
576 * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
577 * associated with it.
578 *
579 * @return the <CODE>ExceptionListener</CODE> for this connection, or
580 * null. if no <CODE>ExceptionListener</CODE> is associated with
581 * this connection.
582 * @throws JMSException
583 * if the JMS provider fails to get the <CODE>ExceptionListener</CODE>
584 * for this connection.
585 * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
586 */
587 public ExceptionListener getExceptionListener() throws JMSException {
588 checkClosed();
589 return this.exceptionListener;
590 }
591
592 /**
593 * Sets an exception listener for this connection.
594 * <P>
595 * If a JMS provider detects a serious problem with a connection, it informs
596 * the connection's <CODE> ExceptionListener</CODE>, if one has been
597 * registered. It does this by calling the listener's <CODE>onException
598 * </CODE> method, passing it a <CODE>JMSException</CODE> object
599 * describing the problem.
600 * <P>
601 * An exception listener allows a client to be notified of a problem
602 * asynchronously. Some connections only consume messages, so they would
603 * have no other way to learn their connection has failed.
604 * <P>
605 * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
606 * <P>
607 * A JMS provider should attempt to resolve connection problems itself
608 * before it notifies the client of them.
609 *
610 * @param listener
611 * the exception listener
612 * @throws JMSException
613 * if the JMS provider fails to set the exception listener for
614 * this connection.
615 */
616 public void setExceptionListener(ExceptionListener listener)
617 throws JMSException {
618 checkClosed();
619 this.exceptionListener = listener;
620 this.transportChannel.setExceptionListener(listener);
621 }
622
623 /**
624 * Starts (or restarts) a connection's delivery of incoming messages. A call
625 * to <CODE>start</CODE> on a connection that has already been started is
626 * ignored.
627 *
628 * @throws JMSException
629 * if the JMS provider fails to start message delivery due to
630 * some internal error.
631 * @see javax.jms.Connection#stop()
632 */
633 public void start() throws JMSException {
634 checkClosed();
635 if (started.commit(false, true)) {
636 // We have a change in connection info to send.
637 // send the Connection info again
638 sendConnectionInfoToBroker(sendConnectionInfoTimeout, true, false);
639 for (Iterator i = sessions.iterator(); i.hasNext();) {
640 ActiveMQSession s = (ActiveMQSession) i.next();
641 s.start();
642 }
643 }
644 }
645
646 /**
647 * @return true if this Connection is started
648 */
649 protected boolean isStarted() {
650 return started.get();
651 }
652
653 /**
654 * Temporarily stops a connection's delivery of incoming messages. Delivery
655 * can be restarted using the connection's <CODE>start</CODE> method. When
656 * the connection is stopped, delivery to all the connection's message
657 * consumers is inhibited: synchronous receives block, and messages are not
658 * delivered to message listeners.
659 * <P>
660 * This call blocks until receives and/or message listeners in progress have
661 * completed.
662 * <P>
663 * Stopping a connection has no effect on its ability to send messages. A
664 * call to <CODE>stop</CODE> on a connection that has already been stopped
665 * is ignored.
666 * <P>
667 * A call to <CODE>stop</CODE> must not return until delivery of messages
668 * has paused. This means that a client can rely on the fact that none of
669 * its message listeners will be called and that all threads of control
670 * waiting for <CODE>receive</CODE> calls to return will not return with a
671 * message until the connection is restarted. The receive timers for a
672 * stopped connection continue to advance, so receives may time out while
673 * the connection is stopped.
674 * <P>
675 * If message listeners are running when <CODE>stop</CODE> is invoked, the
676 * <CODE>stop</CODE> call must wait until all of them have returned before
677 * it may return. While these message listeners are completing, they must
678 * have the full services of the connection available to them.
679 *
680 * @throws JMSException
681 * if the JMS provider fails to stop message delivery due to
682 * some internal error.
683 * @see javax.jms.Connection#start()
684 */
685 public void stop() throws JMSException {
686 checkClosed();
687 if (started.commit(true, false)) {
688 for (Iterator i = sessions.iterator(); i.hasNext();) {
689 ActiveMQSession s = (ActiveMQSession) i.next();
690 s.stop();
691 }
692 sendConnectionInfoToBroker(2000, true, false);
693 }
694 }
695
696 /**
697 * Closes the connection.
698 * <P>
699 * Since a provider typically allocates significant resources outside the
700 * JVM on behalf of a connection, clients should close these resources when
701 * they are not needed. Relying on garbage collection to eventually reclaim
702 * these resources may not be timely enough.
703 * <P>
704 * There is no need to close the sessions, producers, and consumers of a
705 * closed connection.
706 * <P>
707 * Closing a connection causes all temporary destinations to be deleted.
708 * <P>
709 * When this method is invoked, it should not return until message
710 * processing has been shut down in an orderly fashion. This means that all
711 * message listeners that may have been running have returned, and that all
712 * pending receives have returned. A close terminates all pending message
713 * receives on the connection's sessions' consumers. The receives may return
714 * with a message or with null, depending on whether there was a message
715 * available at the time of the close. If one or more of the connection's
716 * sessions' message listeners is processing a message at the time when
717 * connection <CODE>close</CODE> is invoked, all the facilities of the
718 * connection and its sessions must remain available to those listeners
719 * until they return control to the JMS provider.
720 * <P>
721 * Closing a connection causes any of its sessions' transactions in progress
722 * to be rolled back. In the case where a session's work is coordinated by
723 * an external transaction manager, a session's <CODE>commit</CODE> and
724 * <CODE> rollback</CODE> methods are not used and the result of a closed
725 * session's work is determined later by the transaction manager. Closing a
726 * connection does NOT force an acknowledgment of client-acknowledged
727 * sessions.
728 * <P>
729 * Invoking the <CODE>acknowledge</CODE> method of a received message from
730 * a closed connection's session must throw an <CODE>IllegalStateException</CODE>.
731 * Closing a closed connection must NOT throw an exception.
732 *
733 * @throws JMSException
734 * if the JMS provider fails to close the connection due to some
735 * internal error. For example, a failure to release resources
736 * or to close a socket connection can cause this exception to
737 * be thrown.
738 */
739 public void close() throws JMSException {
740 this.transportChannel.setPendingStop(true);
741 synchronized (this) {
742 if (!closed) {
743 memoryManager.removeCapacityEventListener(this);
744 try {
745 closeTemporaryDestinations();
746 for (Iterator i = this.sessions.iterator(); i.hasNext();) {
747 ActiveMQSession s = (ActiveMQSession) i.next();
748 s.close();
749 }
750 for (Iterator i = this.connectionConsumers.iterator(); i
751 .hasNext();) {
752 ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer) i
753 .next();
754 c.close();
755 }
756 try {
757 sendConnectionInfoToBroker(sendCloseTimeout, true, true);
758 } catch (TimeoutExpiredException e) {
759 log
760 .warn("Failed to send close to broker, timeout expired of: "
761 + sendCloseTimeout + " millis");
762 }
763 this.connectionConsumers.clear();
764 this.messageDispatchers.clear();
765 this.transportChannel.stop();
766 } finally {
767 this.sessions.clear();
768 started.set(false);
769 factory.onConnectionClose(this);
770 }
771 closed = true;
772 transientConsumedRedeliverCache.clear();
773 validDestinationsMap.clear();
774 factoryStats.removeConnection(this);
775 }
776 }
777
778 }
779
780 /**
781 * Tells the broker to terminate its VM. This can be used to cleanly terminate a broker running in
782 * a standalone java process. Server must have property enable.vm.shutdown=true defined
783 * to allow this to work.
784 */
785 public void terminateBrokerVM() throws JMSException {
786 BrokerAdminCommand command = new BrokerAdminCommand();
787 command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
788 asyncSendPacket(command);
789 }
790
791 /**
792 * simply throws an exception if the Connection is already closed
793 *
794 * @throws JMSException
795 */
796 protected synchronized void checkClosed() throws JMSException {
797 if (!startedTransport) {
798 startedTransport = true;
799 this.transportChannel.setCachingEnabled(isCachingEnabled());
800 if (useAsyncSend == false) {
801 this.transportChannel.setNoDelay(true);
802 }
803
804 this.transportChannel.setUsedInternally(internalConnection);
805 this.transportChannel.start();
806 if (transportChannel.doesSupportWireFormatVersioning()) {
807 WireFormatInfo info = new WireFormatInfo();
808 info.setVersion(transportChannel.getCurrentWireFormatVersion());
809 this.asyncSendPacket(info);
810 }
811 }
812 if (this.closed) {
813 throw new ConnectionClosedException();
814 }
815 }
816
817 /**
818 * Creates a connection consumer for this connection (optional operation).
819 * This is an expert facility not used by regular JMS clients.
820 *
821 * @param destination
822 * the destination to access
823 * @param messageSelector
824 * only messages with properties matching the message selector
825 * expression are delivered. A value of null or an empty string
826 * indicates that there is no message selector for the message
827 * consumer.
828 * @param sessionPool
829 * the server session pool to associate with this connection
830 * consumer
831 * @param maxMessages
832 * the maximum number of messages that can be assigned to a
833 * server session at one time
834 * @return the connection consumer
835 * @throws JMSException
836 * if the <CODE>Connection</CODE> object fails to create a
837 * connection consumer due to some internal error or invalid
838 * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
839 * @throws javax.jms.InvalidDestinationException
840 * if an invalid destination is specified.
841 * @throws javax.jms.InvalidSelectorException
842 * if the message selector is invalid.
843 * @see javax.jms.ConnectionConsumer
844 * @since 1.1
845 */
846 public ConnectionConsumer createConnectionConsumer(Destination destination,
847 String messageSelector, ServerSessionPool sessionPool,
848 int maxMessages) throws JMSException {
849 checkClosed();
850 ensureClientIDInitialised();
851 ConsumerInfo info = new ConsumerInfo();
852 info.setConsumerId(handleIdGenerator.generateId());
853 info.setDestination(ActiveMQMessageTransformation
854 .transformDestination(destination));
855 info.setSelector(messageSelector);
856 info.setConsumerNo(handleIdGenerator.getNextShortSequence());
857 return new ActiveMQConnectionConsumer(this, sessionPool, info,
858 maxMessages);
859 }
860
861 /**
862 * Creates a connection consumer for this connection (optional operation).
863 * This is an expert facility not used by regular JMS clients.
864 *
865 * @param destination
866 * the destination to access
867 * @param messageSelector
868 * only messages with properties matching the message selector
869 * expression are delivered. A value of null or an empty string
870 * indicates that there is no message selector for the message
871 * consumer.
872 * @param sessionPool
873 * the server session pool to associate with this connection
874 * consumer
875 * @param maxMessages
876 * the maximum number of messages that can be assigned to a
877 * server session at one time
878 * @param noLocal
879 * set true if you want to filter out messages published locally
880 *
881 * @return the connection consumer
882 * @throws JMSException
883 * if the <CODE>Connection</CODE> object fails to create a
884 * connection consumer due to some internal error or invalid
885 * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
886 * @throws javax.jms.InvalidDestinationException
887 * if an invalid destination is specified.
888 * @throws javax.jms.InvalidSelectorException
889 * if the message selector is invalid.
890 * @see javax.jms.ConnectionConsumer
891 * @since 1.1
892 */
893 public ConnectionConsumer createConnectionConsumer(Destination destination,
894 String messageSelector, ServerSessionPool sessionPool,
895 int maxMessages, boolean noLocal) throws JMSException {
896
897 checkClosed();
898 ensureClientIDInitialised();
899 ConsumerInfo info = new ConsumerInfo();
900 info.setConsumerId(handleIdGenerator.generateId());
901 info.setDestination(ActiveMQMessageTransformation
902 .transformDestination(destination));
903 info.setSelector(messageSelector);
904 info.setConsumerNo(handleIdGenerator.getNextShortSequence());
905 info.setNoLocal(noLocal);
906 return new ActiveMQConnectionConsumer(this, sessionPool, info,
907 maxMessages);
908 }
909
910
911
912 /**
913 * Create a durable connection consumer for this connection (optional
914 * operation). This is an expert facility not used by regular JMS clients.
915 *
916 * @param topic
917 * topic to access
918 * @param subscriptionName
919 * durable subscription name
920 * @param messageSelector
921 * only messages with properties matching the message selector
922 * expression are delivered. A value of null or an empty string
923 * indicates that there is no message selector for the message
924 * consumer.
925 * @param sessionPool
926 * the server session pool to associate with this durable
927 * connection consumer
928 * @param maxMessages
929 * the maximum number of messages that can be assigned to a
930 * server session at one time
931 * @return the durable connection consumer
932 * @throws JMSException
933 * if the <CODE>Connection</CODE> object fails to create a
934 * connection consumer due to some internal error or invalid
935 * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
936 * @throws javax.jms.InvalidDestinationException
937 * if an invalid destination is specified.
938 * @throws javax.jms.InvalidSelectorException
939 * if the message selector is invalid.
940 * @see javax.jms.ConnectionConsumer
941 * @since 1.1
942 */
943 public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
944 String subscriptionName, String messageSelector,
945 ServerSessionPool sessionPool, int maxMessages) throws JMSException {
946 checkClosed();
947 ensureClientIDInitialised();
948 ConsumerInfo info = new ConsumerInfo();
949 info.setConsumerId(this.handleIdGenerator.generateId());
950 info.setDestination(ActiveMQMessageTransformation
951 .transformDestination(topic));
952 info.setSelector(messageSelector);
953 info.setConsumerName(subscriptionName);
954 info.setConsumerNo(handleIdGenerator.getNextShortSequence());
955 return new ActiveMQConnectionConsumer(this, sessionPool, info,
956 maxMessages);
957 }
958
959 /**
960 * Create a durable connection consumer for this connection (optional
961 * operation). This is an expert facility not used by regular JMS clients.
962 *
963 * @param topic
964 * topic to access
965 * @param subscriptionName
966 * durable subscription name
967 * @param messageSelector
968 * only messages with properties matching the message selector
969 * expression are delivered. A value of null or an empty string
970 * indicates that there is no message selector for the message
971 * consumer.
972 * @param sessionPool
973 * the server session pool to associate with this durable
974 * connection consumer
975 * @param maxMessages
976 * the maximum number of messages that can be assigned to a
977 * server session at one time
978 * @param noLocal
979 * set true if you want to filter out messages published locally
980 *
981 * @return the durable connection consumer
982 * @throws JMSException
983 * if the <CODE>Connection</CODE> object fails to create a
984 * connection consumer due to some internal error or invalid
985 * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
986 * @throws javax.jms.InvalidDestinationException
987 * if an invalid destination is specified.
988 * @throws javax.jms.InvalidSelectorException
989 * if the message selector is invalid.
990 * @see javax.jms.ConnectionConsumer
991 * @since 1.1
992 */
993 public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
994 String subscriptionName, String messageSelector,
995 ServerSessionPool sessionPool, int maxMessages, boolean noLocal) throws JMSException {
996 checkClosed();
997 ensureClientIDInitialised();
998 ConsumerInfo info = new ConsumerInfo();
999 info.setConsumerId(this.handleIdGenerator.generateId());
1000 info.setDestination(ActiveMQMessageTransformation
1001 .transformDestination(topic));
1002 info.setSelector(messageSelector);
1003 info.setConsumerName(subscriptionName);
1004 info.setNoLocal(noLocal);
1005 info.setConsumerNo(handleIdGenerator.getNextShortSequence());
1006 return new ActiveMQConnectionConsumer(this, sessionPool, info,
1007 maxMessages);
1008 }
1009
1010 /**
1011 * Implementation of the PacketListener interface - consume a packet
1012 *
1013 * @param packet -
1014 * the Packet to consume
1015 * @see org.activemq.message.PacketListener#consume(org.activemq.message.Packet)
1016 */
1017 public void consume(Packet packet) {
1018 if (!closed && packet != null) {
1019 if (packet.isJMSMessage()) {
1020 ActiveMQMessage message = (ActiveMQMessage) packet;
1021 message.setReadOnly(true);
1022 message.setConsumerIdentifer(clientID);
1023
1024 // lets check for expired messages which is only relevant for
1025 // multicast based stuff
1026 // as a pointcast based network should filter out this stuff
1027 if (transportChannel.isMulticast()) {
1028 long expiration = message.getJMSExpiration();
1029 if (expiration > 0) {
1030 long timeStamp = System.currentTimeMillis();
1031 if (timeStamp > expiration) {
1032 if (log.isDebugEnabled()) {
1033 log.debug("Discarding expired message: "
1034 + message);
1035 }
1036 return;
1037 }
1038 }
1039 }
1040
1041 try {
1042 int count = 0;
1043 for (Iterator i = this.messageDispatchers.iterator(); i
1044 .hasNext();) {
1045 ActiveMQMessageDispatcher dispatcher = (ActiveMQMessageDispatcher) i
1046 .next();
1047 if (dispatcher.isTarget(message)) {
1048 if (count > 0) {
1049 // separate message for each Session etc.
1050 message = message.deepCopy();
1051 }
1052 dispatcher.dispatch(message);
1053 count++;
1054 }
1055 }
1056 } catch (JMSException jmsEx) {
1057 handleAsyncException(jmsEx);
1058 }
1059 } else if (packet.getPacketType() == Packet.CAPACITY_INFO) {
1060 CapacityInfo info = (CapacityInfo) packet;
1061 flowControlSleepTime = info.getFlowControlTimeout();
1062 // System.out.println("SET FLOW TIMEOUT = " +
1063 // flowControlSleepTime + " FOR " + info);
1064 } else if (packet.getPacketType() == Packet.KEEP_ALIVE
1065 && packet.isReceiptRequired()) {
1066 Receipt receipt = new Receipt();
1067 receipt.setCorrelationId(packet.getId());
1068 receipt.setReceiptRequired(false);
1069 try {
1070 asyncSendPacket(receipt);
1071 } catch (JMSException jmsEx) {
1072 handleAsyncException(jmsEx);
1073 }
1074 }
1075 }
1076 }
1077
1078 /**
1079 * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
1080 */
1081 public void onException(JMSException jmsEx) {
1082 // Got an exception propagated up from the transport channel
1083 handleAsyncException(jmsEx);
1084 isTransportOK = false;
1085 try {
1086 close();
1087 } catch (JMSException ex) {
1088 log.debug("Exception closing the connection", ex);
1089 }
1090 }
1091
1092 /**
1093 * Creates a <CODE>TopicSession</CODE> object.
1094 *
1095 * @param transacted
1096 * indicates whether the session is transacted
1097 * @param acknowledgeMode
1098 * indicates whether the consumer or the client will acknowledge
1099 * any messages it receives; ignored if the session is
1100 * transacted. Legal values are
1101 * <code>Session.AUTO_ACKNOWLEDGE</code>,
1102 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1103 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1104 * @return a newly created topic session
1105 * @throws JMSException
1106 * if the <CODE>TopicConnection</CODE> object fails to create
1107 * a session due to some internal error or lack of support for
1108 * the specific transaction and acknowledgement mode.
1109 * @see Session#AUTO_ACKNOWLEDGE
1110 * @see Session#CLIENT_ACKNOWLEDGE
1111 * @see Session#DUPS_OK_ACKNOWLEDGE
1112 */
1113 public TopicSession createTopicSession(boolean transacted,
1114 int acknowledgeMode) throws JMSException {
1115 return new ActiveMQTopicSession((ActiveMQSession) createSession(
1116 transacted, acknowledgeMode));
1117 }
1118
1119 /**
1120 * Creates a connection consumer for this connection (optional operation).
1121 * This is an expert facility not used by regular JMS clients.
1122 *
1123 * @param topic
1124 * the topic to access
1125 * @param messageSelector
1126 * only messages with properties matching the message selector
1127 * expression are delivered. A value of null or an empty string
1128 * indicates that there is no message selector for the message
1129 * consumer.
1130 * @param sessionPool
1131 * the server session pool to associate with this connection
1132 * consumer
1133 * @param maxMessages
1134 * the maximum number of messages that can be assigned to a
1135 * server session at one time
1136 * @return the connection consumer
1137 * @throws JMSException
1138 * if the <CODE>TopicConnection</CODE> object fails to create
1139 * a connection consumer due to some internal error or invalid
1140 * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
1141 * @throws InvalidDestinationException
1142 * if an invalid topic is specified.
1143 * @throws InvalidSelectorException
1144 * if the message selector is invalid.
1145 * @see javax.jms.ConnectionConsumer
1146 */
1147 public ConnectionConsumer createConnectionConsumer(Topic topic,
1148 String messageSelector, ServerSessionPool sessionPool,
1149 int maxMessages) throws JMSException {
1150 checkClosed();
1151 ensureClientIDInitialised();
1152 ConsumerInfo info = new ConsumerInfo();
1153 info.setConsumerId(this.handleIdGenerator.generateId());
1154 info.setDestination(ActiveMQMessageTransformation
1155 .transformDestination(topic));
1156 info.setSelector(messageSelector);
1157 info.setConsumerNo(handleIdGenerator.getNextShortSequence());
1158 return new ActiveMQConnectionConsumer(this, sessionPool, info,
1159 maxMessages);
1160 }
1161
1162 /**
1163 * Creates a <CODE>QueueSession</CODE> object.
1164 *
1165 * @param transacted
1166 * indicates whether the session is transacted
1167 * @param acknowledgeMode
1168 * indicates whether the consumer or the client will acknowledge
1169 * any messages it receives; ignored if the session is
1170 * transacted. Legal values are
1171 * <code>Session.AUTO_ACKNOWLEDGE</code>,
1172 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1173 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1174 * @return a newly created queue session
1175 * @throws JMSException
1176 * if the <CODE>QueueConnection</CODE> object fails to create
1177 * a session due to some internal error or lack of support for
1178 * the specific transaction and acknowledgement mode.
1179 * @see Session#AUTO_ACKNOWLEDGE
1180 * @see Session#CLIENT_ACKNOWLEDGE
1181 * @see Session#DUPS_OK_ACKNOWLEDGE
1182 */
1183 public QueueSession createQueueSession(boolean transacted,
1184 int acknowledgeMode) throws JMSException {
1185 return new ActiveMQQueueSession((ActiveMQSession) createSession(
1186 transacted, acknowledgeMode));
1187 }
1188
1189 /**
1190 * Creates a connection consumer for this connection (optional operation).
1191 * This is an expert facility not used by regular JMS clients.
1192 *
1193 * @param queue
1194 * the queue to access
1195 * @param messageSelector
1196 * only messages with properties matching the message selector
1197 * expression are delivered. A value of null or an empty string
1198 * indicates that there is no message selector for the message
1199 * consumer.
1200 * @param sessionPool
1201 * the server session pool to associate with this connection
1202 * consumer
1203 * @param maxMessages
1204 * the maximum number of messages that can be assigned to a
1205 * server session at one time
1206 * @return the connection consumer
1207 * @throws JMSException
1208 * if the <CODE>QueueConnection</CODE> object fails to create
1209 * a connection consumer due to some internal error or invalid
1210 * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
1211 * @throws InvalidDestinationException
1212 * if an invalid queue is specified.
1213 * @throws InvalidSelectorException
1214 * if the message selector is invalid.
1215 * @see javax.jms.ConnectionConsumer
1216 */
1217 public ConnectionConsumer createConnectionConsumer(Queue queue,
1218 String messageSelector, ServerSessionPool sessionPool,
1219 int maxMessages) throws JMSException {
1220 checkClosed();
1221 ensureClientIDInitialised();
1222 ConsumerInfo info = new ConsumerInfo();
1223 info.setConsumerId(this.handleIdGenerator.generateId());
1224 info.setDestination(ActiveMQMessageTransformation
1225 .transformDestination(queue));
1226 info.setSelector(messageSelector);
1227 info.setConsumerNo(handleIdGenerator.getNextShortSequence());
1228 return new ActiveMQConnectionConsumer(this, sessionPool, info,
1229 maxMessages);
1230 }
1231
1232 /**
1233 * Ensures that the clientID was manually specified and not auto-generated.
1234 * If the clientID was not specified this method will throw an exception.
1235 * This method is used to ensure that the clientID + durableSubscriber name
1236 * are used correctly.
1237 *
1238 * @throws JMSException
1239 */
1240 public void checkClientIDWasManuallySpecified() throws JMSException {
1241 if (!userSpecifiedClientID) {
1242 throw new JMSException(
1243 "You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1244 }
1245 }
1246
1247 /**
1248 * handle disconnect/reconnect events
1249 *
1250 * @param event
1251 */
1252 public void statusChanged(TransportStatusEvent event) {
1253 log.info("channel status changed: " + event);
1254 if (event.getChannelStatus() == TransportStatusEvent.RECONNECTED) {
1255 isTransportOK = true;
1256 doReconnect();
1257
1258 } else if (event.getChannelStatus() == TransportStatusEvent.DISCONNECTED) {
1259 isTransportOK = false;
1260 clearMessagesInProgress();
1261 }
1262 }
1263
1264 /**
1265 * send a Packet through the Connection - for internal use only
1266 *
1267 * @param packet
1268 * @throws JMSException
1269 */
1270 public void asyncSendPacket(Packet packet) throws JMSException {
1271 asyncSendPacket(packet, true);
1272 }
1273
1274 /**
1275 * send a Packet through the Connection - for internal use only
1276 *
1277 * @param packet
1278 * @param doSendWhileReconnecting
1279 * @throws JMSException
1280 */
1281 public void asyncSendPacket(Packet packet, boolean doSendWhileReconnecting)
1282 throws JMSException {
1283 if (isTransportOK
1284 && !closed
1285 && (doSendWhileReconnecting || transportChannel
1286 .isTransportConnected())) {
1287 packet.setId(packetIdGenerator.getNextShortSequence());
1288 packet.setReceiptRequired(false);
1289 if (packet.isJMSMessage() && flowControlSleepTime > 0) {
1290 try {
1291 Thread.sleep(flowControlSleepTime);
1292 } catch (InterruptedException e) {
1293 }
1294 }
1295 this.transportChannel.asyncSend(packet);
1296 }
1297 }
1298
1299 /**
1300 * send a Packet through a Connection - for internal use only
1301 *
1302 * @param packet
1303 * @throws JMSException
1304 */
1305 public void syncSendPacket(Packet packet) throws JMSException {
1306 syncSendPacket(packet, 0);
1307 }
1308
1309 /**
1310 * Send a packet through a Connection - for internal use only
1311 *
1312 * @param packet
1313 * @param timeout
1314 * @throws JMSException
1315 */
1316 public void syncSendPacket(Packet packet, int timeout) throws JMSException {
1317 if (isTransportOK && !closed) {
1318 Receipt receipt;
1319 packet.setId(packetIdGenerator.getNextShortSequence());
1320 packet.setReceiptRequired(true);
1321 receipt = this.transportChannel.send(packet, timeout);
1322 if (receipt != null) {
1323 if (receipt.isFailed()) {
1324 Throwable e = receipt.getException();
1325 if (e != null) {
1326 throw JMSExceptionHelper.newJMSException(e);
1327 }
1328 throw new JMSException(
1329 "syncSendPacket failed with unknown exception");
1330 }
1331 }
1332 } else {
1333 if (closed) {
1334 throw new ConnectionClosedException();
1335 } else {
1336 throw new JMSException(
1337 "syncSendTimedOut: connection no longer OK");
1338 }
1339 }
1340 }
1341
1342 public Receipt syncSendRequest(Packet packet) throws JMSException {
1343 checkClosed();
1344 if (isTransportOK && !closed) {
1345 Receipt receipt;
1346 packet.setReceiptRequired(true);
1347 packet.setId(this.packetIdGenerator.getNextShortSequence());
1348
1349 receipt = this.transportChannel.send(packet);
1350 if (receipt != null && receipt.isFailed()) {
1351 Throwable e = receipt.getException();
1352 if (e != null) {
1353 throw (JMSException) new JMSException(e.getMessage())
1354 .initCause(e);
1355 }
1356 throw new JMSException(
1357 "syncSendPacket failed with unknown exception");
1358 }
1359 return receipt;
1360 } else {
1361 if (closed) {
1362 throw new ConnectionClosedException();
1363 } else {
1364 throw new JMSException(
1365 "syncSendTimedOut: connection no longer OK");
1366 }
1367 }
1368 }
1369
1370 // Properties
1371 // -------------------------------------------------------------------------
1372
1373 /**
1374 * @return Returns the prefetchPolicy.
1375 */
1376 public ActiveMQPrefetchPolicy getPrefetchPolicy() {
1377 return prefetchPolicy;
1378 }
1379
1380 /**
1381 * @param prefetchPolicy
1382 * The prefetchPolicy to set.
1383 */
1384 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
1385 this.prefetchPolicy = prefetchPolicy;
1386 }
1387
1388 public int getSendCloseTimeout() {
1389 return sendCloseTimeout;
1390 }
1391
1392 public void setSendCloseTimeout(int sendCloseTimeout) {
1393 this.sendCloseTimeout = sendCloseTimeout;
1394 }
1395
1396 public int getSendConnectionInfoTimeout() {
1397 return sendConnectionInfoTimeout;
1398 }
1399
1400 public void setSendConnectionInfoTimeout(int sendConnectionInfoTimeout) {
1401 this.sendConnectionInfoTimeout = sendConnectionInfoTimeout;
1402 }
1403
1404 public TransportChannel getTransportChannel() {
1405 return transportChannel;
1406 }
1407
1408 /**
1409 * Returns the clientID of the connection, forcing one to be generated if
1410 * one has not yet been configured
1411 */
1412 public String getInitializedClientID() throws JMSException {
1413 ensureClientIDInitialised();
1414 return this.clientID;
1415 }
1416
1417 // Implementation methods
1418 // -------------------------------------------------------------------------
1419
1420 /**
1421 * Used internally for adding Sessions to the Connection
1422 *
1423 * @param session
1424 * @throws JMSException
1425 */
1426 protected void addSession(ActiveMQSession session) throws JMSException {
1427 this.sessions.add(session);
1428 addMessageDispatcher(session);
1429 if (started.get()) {
1430 session.start();
1431 }
1432 SessionInfo info = createSessionInfo(session);
1433 info.setStarted(true);
1434 asyncSendPacket(info);
1435 }
1436
1437 /**
1438 * Used interanlly for removing Sessions from a Connection
1439 *
1440 * @param session
1441 * @throws JMSException
1442 */
1443 protected void removeSession(ActiveMQSession session) throws JMSException {
1444 this.sessions.remove(session);
1445 removeMessageDispatcher(session);
1446 SessionInfo info = createSessionInfo(session);
1447 info.setStarted(false);
1448 asyncSendPacket(info, false);
1449 }
1450
1451 private SessionInfo createSessionInfo(ActiveMQSession session) {
1452 SessionInfo info = new SessionInfo();
1453 info.setClientId(clientID);
1454 info.setSessionId(session.getSessionId());
1455 info.setStartTime(session.getStartTime());
1456 return info;
1457 }
1458
1459 /**
1460 * Add a ConnectionConsumer
1461 *
1462 * @param connectionConsumer
1463 * @throws JMSException
1464 */
1465 protected void addConnectionConsumer(
1466 ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
1467 this.connectionConsumers.add(connectionConsumer);
1468 addMessageDispatcher(connectionConsumer);
1469 }
1470
1471 /**
1472 * Remove a ConnectionConsumer
1473 *
1474 * @param connectionConsumer
1475 */
1476 protected void removeConnectionConsumer(
1477 ActiveMQConnectionConsumer connectionConsumer) {
1478 this.connectionConsumers.add(connectionConsumer);
1479 removeMessageDispatcher(connectionConsumer);
1480 }
1481
1482 /**
1483 * Add a Message dispatcher to receive messages from the Broker
1484 *
1485 * @param messageDispatch
1486 * @throws JMSException
1487 * if an internal error
1488 */
1489 protected void addMessageDispatcher(
1490 ActiveMQMessageDispatcher messageDispatch) throws JMSException {
1491 this.messageDispatchers.add(messageDispatch);
1492 }
1493
1494 /**
1495 * Remove a Message dispatcher
1496 *
1497 * @param messageDispatcher
1498 */
1499 protected void removeMessageDispatcher(
1500 ActiveMQMessageDispatcher messageDispatcher) {
1501 this.messageDispatchers.remove(messageDispatcher);
1502 }
1503
1504 /**
1505 * Used for handling async exceptions
1506 *
1507 * @param jmsEx
1508 */
1509 protected void handleAsyncException(JMSException jmsEx) {
1510 if (!closed) {
1511 if (this.exceptionListener != null) {
1512 this.exceptionListener.onException(jmsEx);
1513 } else {
1514 log.warn(
1515 "Async exception with no exception listener: " + jmsEx,
1516 jmsEx);
1517 }
1518 }
1519 }
1520
1521 protected void sendConnectionInfoToBroker() throws JMSException {
1522 sendConnectionInfoToBroker(sendConnectionInfoTimeout, closed, false);
1523 }
1524
1525 /**
1526 * Send the ConnectionInfo to the Broker
1527 *
1528 * @param timeout
1529 * @param isClosed
1530 * @throws JMSException
1531 */
1532 protected void sendConnectionInfoToBroker(int timeout, boolean forceResend,
1533 boolean closing) throws JMSException {
1534 // Can we skip sending the ConnectionInfo packet??
1535 if (isConnectionInfoSentToBroker && !forceResend) {
1536 return;
1537 }
1538
1539 this.isConnectionInfoSentToBroker = true;
1540 ensureClientIDInitialised();
1541 ConnectionInfo info = new ConnectionInfo();
1542 info.setClientId(this.clientID);
1543 info.setHostName(IdGenerator.getHostName());
1544 info.setUserName(userName);
1545 info.setPassword(password);
1546 info.setStartTime(startTime);
1547 info.setStarted(started.get());
1548 info.setClosed(closed || closing);
1549 info.setClientVersion(connectionMetaData.getProviderVersion());
1550 info.setWireFormatVersion(transportChannel
1551 .getCurrentWireFormatVersion());
1552 if (info.getProperties() != null) {
1553 info.getProperties().setProperty(ConnectionInfo.NO_DELAY_PROPERTY,
1554 new Boolean(!useAsyncSend).toString());
1555 }
1556 if (quickClose && info.isClosed()) {
1557 asyncSendPacket(info);
1558 } else {
1559 syncSendPacket(info, timeout);
1560 }
1561 }
1562
1563 /**
1564 * Set the maximum amount of memory this Connection should use for buffered
1565 * inbound messages
1566 *
1567 * @param newMemoryLimit
1568 * the new memory limit in bytes
1569 */
1570 public void setConnectionMemoryLimit(int newMemoryLimit) {
1571 memoryManager.setValueLimit(newMemoryLimit);
1572 }
1573
1574 /**
1575 * Get the current value for the maximum amount of memory this Connection
1576 * should use for buffered inbound messages
1577 *
1578 * @return the current limit in bytes
1579 */
1580 public int getConnectionMemoryLimit() {
1581 return (int) memoryManager.getValueLimit();
1582 }
1583
1584 /**
1585 * CapacityMonitorEventListener implementation called when the capacity of a
1586 * CapacityService changes
1587 *
1588 * @param event
1589 */
1590 public void capacityChanged(CapacityMonitorEvent event) {
1591 // send the event to broker ...
1592 CapacityInfo info = new CapacityInfo();
1593 info.setResourceName(event.getMonitorName());
1594 info.setCapacity(event.getCapacity());
1595 // System.out.println("Cap changed: " + event);
1596 try {
1597 asyncSendPacket(info, false);
1598 } catch (JMSException e) {
1599 JMSException jmsEx = new JMSException(
1600 "failed to send change in capacity");
1601 jmsEx.setLinkedException(e);
1602 handleAsyncException(jmsEx);
1603 }
1604 }
1605
1606 /**
1607 * @return a number unique for this connection
1608 */
1609 protected int getNextConsumerNumber() {
1610 return this.consumerNumberGenerator.increment();
1611 }
1612
1613 protected short generateSessionId() {
1614 return this.sessionIdGenerator.getNextShortSequence();
1615 }
1616
1617 private synchronized void ensureClientIDInitialised() {
1618 if (this.clientID == null || this.clientID.trim().equals("")) {
1619 this.clientID = this.clientIdGenerator.generateId();
1620 }
1621 transportChannel.setClientID(clientID);
1622 this.clientIDSet = true;
1623 }
1624
1625 protected MemoryBoundedQueue getMemoryBoundedQueue(String name) {
1626 return boundedQueueManager.getMemoryBoundedQueue(name);
1627 }
1628
1629 protected void doReconnect() {
1630 try {
1631 // send the Connection info again
1632 sendConnectionInfoToBroker(sendConnectionInfoTimeout, true, false);
1633 for (Iterator iter = sessions.iterator(); iter.hasNext();) {
1634 ActiveMQSession session = (ActiveMQSession) iter.next();
1635 SessionInfo sessionInfo = createSessionInfo(session);
1636 sessionInfo.setStarted(true);
1637 asyncSendPacket(sessionInfo, false);
1638 // send consumers
1639 for (Iterator consumersIterator = session.consumers.iterator(); consumersIterator
1640 .hasNext();) {
1641 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) consumersIterator
1642 .next();
1643 ConsumerInfo consumerInfo = session
1644 .createConsumerInfo(consumer);
1645 consumerInfo.setStarted(true);
1646 asyncSendPacket(consumerInfo, false);
1647 }
1648 // send producers
1649 for (Iterator producersIterator = session.producers.iterator(); producersIterator
1650 .hasNext();) {
1651 ActiveMQMessageProducer producer = (ActiveMQMessageProducer) producersIterator
1652 .next();
1653 ProducerInfo producerInfo = session
1654 .createProducerInfo(producer);
1655 producerInfo.setStarted(true);
1656 asyncSendPacket(producerInfo, false);
1657 }
1658 // send the current capacity
1659 CapacityMonitorEvent event = memoryManager
1660 .generateCapacityMonitorEvent();
1661 if (event != null) {
1662 capacityChanged(event);
1663 }
1664 }
1665 } catch (JMSException jmsEx) {
1666 log.error("Failed to do reconnection");
1667 handleAsyncException(jmsEx);
1668 isTransportOK = false;
1669 }
1670 }
1671
1672 /**
1673 * @return Returns the useAsyncSend.
1674 */
1675 public boolean isUseAsyncSend() {
1676 return useAsyncSend;
1677 }
1678
1679 /**
1680 * @param useAsyncSend
1681 * The useAsyncSend to set.
1682 */
1683 public void setUseAsyncSend(boolean useAsyncSend) {
1684 this.useAsyncSend = useAsyncSend;
1685 }
1686
1687 /**
1688 * @return Returns the cachingEnabled.
1689 */
1690 public boolean isCachingEnabled() {
1691 return cachingEnabled;
1692 }
1693
1694 /**
1695 * @param cachingEnabled
1696 * The cachingEnabled to set.
1697 */
1698 public void setCachingEnabled(boolean cachingEnabled) {
1699 this.cachingEnabled = cachingEnabled;
1700 }
1701
1702 /**
1703 * @return Returns the j2EEcompliant.
1704 */
1705 public boolean isJ2EEcompliant() {
1706 return J2EEcompliant;
1707 }
1708
1709 /**
1710 * @param ecompliant
1711 * The j2EEcompliant to set.
1712 */
1713 public void setJ2EEcompliant(boolean ecompliant) {
1714 J2EEcompliant = ecompliant;
1715 }
1716
1717 /**
1718 * @return Returns the internalConnection.
1719 */
1720 public boolean isInternalConnection() {
1721 return internalConnection;
1722 }
1723
1724 /**
1725 * @param internalConnection
1726 * The internalConnection to set.
1727 */
1728 public void setInternalConnection(boolean internalConnection) {
1729 this.internalConnection = internalConnection;
1730 }
1731
1732 /**
1733 * @return Returns the doMessageCompression.
1734 */
1735 public boolean isDoMessageCompression() {
1736 return doMessageCompression
1737 && transportChannel.doesSupportMessageCompression();
1738 }
1739
1740 /**
1741 * @param doMessageCompression
1742 * The doMessageCompression to set.
1743 */
1744 public void setDoMessageCompression(boolean doMessageCompression) {
1745 this.doMessageCompression = doMessageCompression
1746 && transportChannel.doesSupportMessageCompression();
1747 }
1748
1749 /**
1750 * @return Returns the doMessageFragmentation.
1751 */
1752 public boolean isDoMessageFragmentation() {
1753 return doMessageFragmentation
1754 && transportChannel.doesSupportMessageFragmentation();
1755 }
1756
1757 /**
1758 * @param doMessageFragmentation
1759 * The doMessageFragmentation to set.
1760 */
1761 public void setDoMessageFragmentation(boolean doMessageFragmentation) {
1762 this.doMessageFragmentation = doMessageFragmentation
1763 && transportChannel.doesSupportMessageFragmentation();
1764 }
1765
1766 /**
1767 * @return Returns the messageCompressionLevel.
1768 */
1769 public int getMessageCompressionLevel() {
1770 return messageCompressionLevel;
1771 }
1772
1773 /**
1774 * @param messageCompressionLevel
1775 * The messageCompressionLevel to set.
1776 */
1777 public void setMessageCompressionLevel(int messageCompressionLevel) {
1778 this.messageCompressionLevel = messageCompressionLevel;
1779 }
1780
1781 /**
1782 * @return Returns the messageCompressionLimit.
1783 */
1784 public int getMessageCompressionLimit() {
1785 return messageCompressionLimit;
1786 }
1787
1788 /**
1789 * @param messageCompressionLimit
1790 * The messageCompressionLimit to set.
1791 */
1792 public void setMessageCompressionLimit(int messageCompressionLimit) {
1793 this.messageCompressionLimit = messageCompressionLimit;
1794 }
1795
1796 /**
1797 * @return Returns the messageCompressionStrategy.
1798 */
1799 public int getMessageCompressionStrategy() {
1800 return messageCompressionStrategy;
1801 }
1802
1803 /**
1804 * @param messageCompressionStrategy
1805 * The messageCompressionStrategy to set.
1806 */
1807 public void setMessageCompressionStrategy(int messageCompressionStrategy) {
1808 this.messageCompressionStrategy = messageCompressionStrategy;
1809 }
1810
1811 /**
1812 * @return Returns the messageFragmentationLimit.
1813 */
1814 public int getMessageFragmentationLimit() {
1815 return messageFragmentationLimit;
1816 }
1817
1818 /**
1819 * @param messageFragmentationLimit
1820 * The messageFragmentationLimit to set.
1821 */
1822 public void setMessageFragmentationLimit(int messageFragmentationLimit) {
1823 this.messageFragmentationLimit = messageFragmentationLimit;
1824 }
1825
1826 /**
1827 * @return Returns the disableTimeStampsByDefault.
1828 */
1829 public boolean isDisableTimeStampsByDefault() {
1830 return disableTimeStampsByDefault;
1831 }
1832
1833 /**
1834 * @param disableTimeStampsByDefault
1835 * The disableTimeStampsByDefault to set.
1836 */
1837 public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) {
1838 this.disableTimeStampsByDefault = disableTimeStampsByDefault;
1839 }
1840
1841 /**
1842 * Causes pre-serialization of messages before send By default this is on
1843 *
1844 * @return Returns the prePrepareMessageOnSend.
1845 */
1846 public boolean isPrepareMessageBodyOnSend() {
1847 return prepareMessageBodyOnSend;
1848 }
1849
1850 /**
1851 * Causes pre-serialization of messages before send By default this is on
1852 *
1853 * @param prePrepareMessageOnSend
1854 * The prePrepareMessageOnSend to set.
1855 */
1856 public void setPrepareMessageBodyOnSend(boolean prePrepareMessageOnSend) {
1857 this.prepareMessageBodyOnSend = prePrepareMessageOnSend;
1858 }
1859
1860 /**
1861 * @return Returns the copyMessageOnSend.
1862 */
1863 public boolean isCopyMessageOnSend() {
1864 return copyMessageOnSend;
1865 }
1866
1867 /**
1868 * @param copyMessageOnSend
1869 * The copyMessageOnSend to set.
1870 */
1871 public void setCopyMessageOnSend(boolean copyMessageOnSend) {
1872 this.copyMessageOnSend = copyMessageOnSend;
1873 }
1874
1875 /**
1876 * @return Returns the quickClose.
1877 */
1878 public boolean isQuickClose() {
1879 return quickClose;
1880 }
1881
1882 /**
1883 * @param quickClose
1884 * The quickClose to set.
1885 */
1886 public void setQuickClose(boolean quickClose) {
1887 this.quickClose = quickClose;
1888 }
1889
1890 /**
1891 * @return Returns the optimizedMessageDispatch.
1892 */
1893 public boolean isOptimizedMessageDispatch() {
1894 return optimizedMessageDispatch;
1895 }
1896
1897 /**
1898 * @param optimizedMessageDispatch
1899 * The optimizedMessageDispatch to set.
1900 */
1901 public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
1902 this.optimizedMessageDispatch = optimizedMessageDispatch;
1903 }
1904
1905 protected void clearMessagesInProgress() {
1906 for (Iterator i = sessions.iterator(); i.hasNext();) {
1907 ActiveMQSession session = (ActiveMQSession) i.next();
1908 session.clearMessagesInProgress();
1909 }
1910 }
1911
1912 /**
1913 * Tells the broker to destroy a destination.
1914 *
1915 * @param destination
1916 */
1917 public void destroyDestination(ActiveMQDestination destination)
1918 throws JMSException {
1919 BrokerAdminCommand command = new BrokerAdminCommand();
1920 command.setCommand(BrokerAdminCommand.DESTROY_DESTINATION);
1921 command.setDestination(destination);
1922 syncSendPacket(command);
1923 }
1924
1925 /**
1926 * Cleans up this connection so that it's state is as if the connection was
1927 * just created. This allows the Resource Adapter to clean up a connection
1928 * so that it can be reused without having to close and recreate the
1929 * connection.
1930 *
1931 * @param sessionId
1932 */
1933 public void cleanup() throws JMSException {
1934
1935 try {
1936 for (Iterator i = this.sessions.iterator(); i.hasNext();) {
1937 ActiveMQSession s = (ActiveMQSession) i.next();
1938 s.close();
1939 }
1940 for (Iterator i = this.connectionConsumers.iterator(); i.hasNext();) {
1941 ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer) i
1942 .next();
1943 c.close();
1944 }
1945 this.connectionConsumers.clear();
1946 this.messageDispatchers.clear();
1947 } finally {
1948 this.sessions.clear();
1949 started.set(false);
1950 }
1951
1952 setExceptionListener(null);
1953 clientIDSet = false;
1954 isConnectionInfoSentToBroker = false;
1955
1956 CleanupConnectionInfo cleanupInfo = new CleanupConnectionInfo();
1957 cleanupInfo.setClientId(getClientID());
1958 asyncSendPacket(cleanupInfo);
1959 }
1960
1961 /**
1962 * Changes the associated username/password that is associated with this
1963 * connection. If the connection has been used, you must called cleanup()
1964 * before calling this method.
1965 *
1966 * @throws IllegalStateException
1967 * if the connection is in used.
1968 * @param sessionId
1969 */
1970 public void changeUserInfo(String theUserName, String thePassword)
1971 throws JMSException {
1972 if (isConnectionInfoSentToBroker)
1973 throw new IllegalStateException(
1974 "changeUserInfo used Connection is not allowed");
1975
1976 this.userName = theUserName;
1977 this.password = thePassword;
1978 }
1979
1980 protected void addToTransientConsumedRedeliverCache(ActiveMQMessage message) {
1981 transientConsumedRedeliverCache.add(message);
1982 }
1983
1984 protected void replayTransientConsumedRedeliveredMessages(
1985 ActiveMQSession session, ActiveMQMessageConsumer consumer)
1986 throws JMSException {
1987 if (consumer.getDestination().isTopic()
1988 && !transientConsumedRedeliverCache.isEmpty()) {
1989 Filter filter = getFilterFactory().createFilter(
1990 consumer.getDestination(), consumer.getMessageSelector());
1991 if (consumer.isNoLocal()) {
1992 filter = new AndFilter(filter, new NoLocalFilter(clientID));
1993 }
1994 for (Iterator i = transientConsumedRedeliverCache.iterator(); i
1995 .hasNext();) {
1996 ActiveMQMessage message = (ActiveMQMessage) i.next();
1997 if (filter.matches(message)) {
1998 transientConsumedRedeliverCache.remove(message);
1999 message.setMessageAcknowledge(session);
2000 message.setJMSRedelivered(true);
2001 message.setConsumerNos(new int[] { consumer
2002 .getConsumerNumber() });
2003 consumer.processMessage(message);
2004 }
2005 }
2006 }
2007 }
2008
2009 private FilterFactory getFilterFactory() {
2010 if (filterFactory == null) {
2011 filterFactory = new FilterFactoryImpl();
2012 }
2013 return filterFactory;
2014 }
2015
2016 protected void startTemporaryDestination(ActiveMQDestination dest)
2017 throws JMSException {
2018 if (dest != null && dest.isTemporary()) {
2019 TempDestinationAdvisoryEvent event = (TempDestinationAdvisoryEvent) tempDestinationMap
2020 .get(dest);
2021 if (event == null) {
2022 event = new TempDestinationAdvisoryEvent(dest, true);
2023 tempDestinationMap.put(dest, event);
2024 ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
2025 msg.setObject(event);
2026 msg.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
2027 msg.setJMSDestination(dest.getTopicForTempAdvisory());
2028 msg.setJMSMessageID("ID:" + dest.getPhysicalName()
2029 + " .started");
2030 this.syncSendPacket(msg);
2031 }
2032 }
2033 }
2034
2035 protected void stopTemporaryDestination(ActiveMQDestination dest)
2036 throws JMSException {
2037 if (dest != null && dest.isTemporary()) {
2038 TempDestinationAdvisoryEvent event = (TempDestinationAdvisoryEvent) tempDestinationMap
2039 .remove(dest);
2040 if (event != null) {
2041 event.setStarted(false);
2042 ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
2043 msg.setObject(event);
2044 msg.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
2045 msg.setJMSDestination(dest.getTopicForTempAdvisory());
2046 msg.setJMSMessageID("ID:" + dest.getPhysicalName()
2047 + " .stopped");
2048 this.syncSendPacket(msg);
2049 }
2050 }
2051 }
2052
2053 protected void closeTemporaryDestinations() throws JMSException {
2054 for (Iterator i = tempDestinationMap.keySet().iterator(); i.hasNext();) {
2055 ActiveMQDestination dest = (ActiveMQDestination) i.next();
2056 stopTemporaryDestination(dest);
2057 }
2058 }
2059
2060 protected void startAdvisoryForTempDestination(Destination d)
2061 throws JMSException {
2062 if (d != null) {
2063 ActiveMQDestination dest = (ActiveMQDestination) ActiveMQMessageTransformation
2064 .transformDestination(d);
2065 if (dest.isTemporary()) {
2066 TempDestinationAdvisor test = (TempDestinationAdvisor) validDestinationsMap
2067 .get(dest);
2068 if (test == null) {
2069 test = new TempDestinationAdvisor(this, dest);
2070 test.start();
2071 validDestinationsMap.put(dest, test);
2072 }
2073 }
2074 }
2075 }
2076
2077 protected void stopAdvisoryForTempDestination(ActiveMQDestination d)
2078 throws JMSException {
2079 if (d != null) {
2080 ActiveMQDestination dest = (ActiveMQDestination) ActiveMQMessageTransformation
2081 .transformDestination(d);
2082 if (dest.isTemporary()) {
2083 TempDestinationAdvisor test = (TempDestinationAdvisor) validDestinationsMap
2084 .remove(dest);
2085 if (test != null) {
2086 test.stop();
2087 }
2088 }
2089 }
2090 }
2091
2092 protected final void validateDestination(ActiveMQDestination dest)
2093 throws JMSException {
2094 if (dest != null) {
2095 if (dest.isTemporary()) {
2096 TempDestinationAdvisor test = (TempDestinationAdvisor) validDestinationsMap
2097 .get(dest);
2098 if (dest.isDeleted() || test == null || !test.isActive(dest)) {
2099 throw new JMSException(
2100 "Cannot publish to a deleted Destination: " + dest);
2101 }
2102 }
2103 }
2104 }
2105
2106 /**
2107 * @return Returns the resourceManagerId.
2108 * @throws JMSException
2109 */
2110 synchronized public String getResourceManagerId() throws JMSException {
2111 if (resourceManagerId == null) {
2112 resourceManagerId = determineResourceManagerId();
2113 }
2114 return resourceManagerId;
2115 }
2116
2117 /**
2118 * Get's the resource manager id.
2119 */
2120 private String determineResourceManagerId() throws JMSException {
2121
2122 XATransactionInfo info = new XATransactionInfo();
2123 info.setType(TransactionInfo.GET_RM_ID);
2124
2125 ResponseReceipt receipt = (ResponseReceipt) syncSendRequest(info);
2126 String rmId = (String) receipt.getResult();
2127 assert rmId != null;
2128 return rmId;
2129 }
2130
2131
2132 }