001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.broker.impl;
020 import java.io.IOException;
021 import java.util.ArrayList;
022 import java.util.HashSet;
023 import java.util.Iterator;
024 import java.util.Set;
025
026 import javax.jms.ExceptionListener;
027 import javax.jms.JMSException;
028 import javax.security.auth.Subject;
029 import javax.transaction.xa.XAException;
030
031 import org.activemq.broker.BrokerAdmin;
032 import org.activemq.broker.BrokerClient;
033 import org.activemq.broker.BrokerConnector;
034 import org.activemq.io.util.SpooledBoundedActiveMQMessageQueue;
035 import org.activemq.message.ActiveMQMessage;
036 import org.activemq.message.ActiveMQXid;
037 import org.activemq.message.BrokerAdminCommand;
038 import org.activemq.message.BrokerInfo;
039 import org.activemq.message.CapacityInfo;
040 import org.activemq.message.CleanupConnectionInfo;
041 import org.activemq.message.ConnectionInfo;
042 import org.activemq.message.ConsumerInfo;
043 import org.activemq.message.DurableUnsubscribe;
044 import org.activemq.message.IntResponseReceipt;
045 import org.activemq.message.KeepAlive;
046 import org.activemq.message.MessageAck;
047 import org.activemq.message.Packet;
048 import org.activemq.message.PacketListener;
049 import org.activemq.message.ProducerInfo;
050 import org.activemq.message.Receipt;
051 import org.activemq.message.ResponseReceipt;
052 import org.activemq.message.SessionInfo;
053 import org.activemq.message.TransactionInfo;
054 import org.activemq.message.XATransactionInfo;
055 import org.activemq.transport.NetworkChannel;
056 import org.activemq.transport.NetworkConnector;
057 import org.activemq.transport.TransportChannel;
058 import org.activemq.util.IdGenerator;
059 import org.apache.commons.logging.Log;
060 import org.apache.commons.logging.LogFactory;
061
062 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
063 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
064 import EDU.oswego.cs.dl.util.concurrent.ThreadedExecutor;
065
066 /**
067 * A Broker client side proxy representing a JMS Connnection
068 *
069 * @version $Revision: 1.1.1.1 $
070 */
071 public class BrokerClientImpl implements BrokerClient, ExceptionListener, PacketListener {
072
073 private static final Log log = LogFactory.getLog(BrokerClientImpl.class);
074 private static final Log commandLog = LogFactory.getLog("org.activemq.broker.CommandTrace");
075
076 private BrokerConnector brokerConnector;
077 private TransportChannel channel;
078 private ConnectionInfo connectionInfo;
079 private IdGenerator packetIdGenerator;
080 private SynchronizedBoolean closed;
081 private Set activeConsumers;
082 private CopyOnWriteArrayList consumers;
083 private CopyOnWriteArrayList producers;
084 private CopyOnWriteArrayList transactions;
085 private CopyOnWriteArrayList sessions;
086 private SynchronizedBoolean started;
087 private boolean brokerConnection;
088 private boolean clusteredConnection;
089 private String remoteBrokerName;
090 private int capacity = 100;
091 private SpooledBoundedActiveMQMessageQueue spoolQueue;
092 private boolean cleanedUp;
093 private boolean registered;
094 private ArrayList dispatchQueue = new ArrayList();
095 private Subject subject;
096 private boolean remoteNetworkConnector;
097
098 /**
099 * Default Constructor of BrokerClientImpl
100 */
101 public BrokerClientImpl() {
102 this.packetIdGenerator = new IdGenerator();
103 this.closed = new SynchronizedBoolean(false);
104 this.started = new SynchronizedBoolean(false);
105 this.activeConsumers = new HashSet();
106 this.consumers = new CopyOnWriteArrayList();
107 this.producers = new CopyOnWriteArrayList();
108 this.transactions = new CopyOnWriteArrayList();
109 this.sessions = new CopyOnWriteArrayList();
110 }
111
112 /**
113 * Initialize the BrokerClient
114 *
115 * @param brokerConnector
116 * @param channel
117 */
118 public void initialize(BrokerConnector brokerConnector, TransportChannel channel) {
119 this.brokerConnector = brokerConnector;
120 this.channel = channel;
121 this.channel.setPacketListener(this);
122 this.channel.setExceptionListener(this);
123 log.trace("brokerConnectorConnector client initialized");
124 }
125
126 /**
127 * @return the BrokerConnector this client is associated with
128 */
129 public BrokerConnector getBrokerConnector() {
130 return this.brokerConnector;
131 }
132
133 /**
134 * @return the connection information for this client
135 */
136 public ConnectionInfo getConnectionInfo() {
137 return connectionInfo;
138 }
139
140 /**
141 * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
142 */
143 public void onException(JMSException jmsEx) {
144 log.info("Client disconnected: " + this);
145 log.debug("Disconnect cuase: ", jmsEx);
146 close();
147 }
148
149 /**
150 * @return pretty print for this brokerConnector-client
151 */
152 public String toString() {
153 String str = "brokerConnector-client:(" + hashCode() + ") ";
154 str += connectionInfo == null ? "" : connectionInfo.getClientId();
155 str += ": " + channel;
156 return str;
157 }
158
159 /**
160 * Dispatch an ActiveMQMessage to the end client
161 *
162 * @param message
163 */
164 public void dispatch(ActiveMQMessage message) {
165 if (!isSlowConsumer()) {
166 dispatchToClient(message);
167 }
168 else {
169 if (spoolQueue == null) {
170 log.warn("Connection: " + connectionInfo.getClientId() + " is a slow consumer");
171 String spoolName = brokerConnector.getBrokerInfo().getBrokerName() + "_" + connectionInfo.getClientId();
172 try {
173 spoolQueue = new SpooledBoundedActiveMQMessageQueue(brokerConnector.getBrokerContainer().getBroker()
174 .getTempDir(), spoolName);
175 final SpooledBoundedActiveMQMessageQueue bpq = spoolQueue;
176 ThreadedExecutor exec = new ThreadedExecutor();
177 exec.execute(new Runnable() {
178 public void run() {
179 while (!closed.get()) {
180 try {
181 Packet packet = bpq.dequeue();
182 if (packet != null) {
183 dispatchToClient(packet);
184 }
185 }
186 catch (InterruptedException e) {
187 log.warn("async dispatch got an interupt", e);
188 }
189 catch (JMSException e) {
190 log.error("async dispatch got an problem", e);
191 }
192 }
193 }
194 });
195 }
196 catch (IOException e) {
197 log.error("Could not create SpooledBoundedQueue for this slow consumer", e);
198 close();
199 }
200 catch (InterruptedException e) {
201 log.error("Could not create SpooledBoundedQueue for this slow consumer", e);
202 close();
203 }
204 }
205 if (spoolQueue != null) {
206 try {
207 spoolQueue.enqueue(message);
208 }
209 catch (JMSException e) {
210 log.error(
211 "Could not enqueue message " + message + " to SpooledBoundedQueue for this slow consumer",
212 e);
213 close();
214 }
215 }
216 }
217 }
218
219 private void dispatchToClient(Packet message) {
220 if (started.get()) {
221 send(message);
222
223 }
224 else {
225 boolean msgSent = false;
226 if (message.isJMSMessage()) {
227 ActiveMQMessage jmsMsg = (ActiveMQMessage) message;
228 if (jmsMsg.getJMSActiveMQDestination().isAdvisory()) {
229 send(message);
230 msgSent = true;
231 }
232 }
233 if (!msgSent) {
234 // If the connection is stopped.. we have to hold the message till it is started.
235 synchronized (started) {
236 dispatchQueue.add(message);
237 }
238 }
239 }
240 }
241
242 /**
243 * @return true if the peer for this Client is itself another Broker
244 */
245 public boolean isBrokerConnection() {
246 return brokerConnection;
247 }
248
249 /**
250 * @return true id this client is part of a cluster
251 */
252 public boolean isClusteredConnection() {
253 return clusteredConnection;
254 }
255
256 /**
257 * Get the Capacity for in-progress messages at the peer (probably a JMSConnection) Legimate values between 0-100. 0
258 * capacity representing that the peer cannot process any more messages at the current time
259 *
260 * @return
261 */
262 public int getCapacity() {
263 return capacity;
264 }
265
266 /**
267 * @return the client id of the remote connection
268 */
269 public String getClientID() {
270 if (connectionInfo != null) {
271 return connectionInfo.getClientId();
272 }
273 return null;
274 }
275
276 /**
277 * @return the channel used
278 */
279 public TransportChannel getChannel() {
280 return channel;
281 }
282
283 /**
284 * Get an indication if the peer should be considered as a slow consumer
285 *
286 * @return true id the peer should be considered as a slow consumer
287 */
288 public boolean isSlowConsumer() {
289 return capacity <= 20; //don't want to fill the peer completely - as this may effect it's processing!
290 }
291
292 /**
293 * Consume a Packet from the underlying TransportChannel for processing
294 *
295 * @param packet
296 */
297 public void consume(Packet packet) {
298 if (packet != null) {
299
300 if( commandLog.isDebugEnabled() )
301 commandLog.debug("broker for "+getClientID()+" received: "+packet);
302
303 Throwable requestEx = null;
304 boolean failed = false;
305 boolean receiptRequired = packet.isReceiptRequired();
306 short correlationId = packet.getId();
307 String brokerName = brokerConnector.getBrokerInfo().getBrokerName();
308 String clusterName = brokerConnector.getBrokerInfo().getClusterName();
309 try {
310 if (brokerConnection) {
311 if (remoteBrokerName != null && remoteBrokerName.length() > 0) {
312 packet.addBrokerVisited(remoteBrokerName); //got from the remote broker
313 }
314 packet.addBrokerVisited(brokerName);
315 }
316 if (packet.isJMSMessage()) {
317 ActiveMQMessage message = (ActiveMQMessage) packet;
318
319 if (!brokerConnection) {
320 message.setEntryBrokerName(brokerName);
321 message.setEntryClusterName(clusterName);
322 }
323 consumeActiveMQMessage(message);
324 }
325 else {
326 switch (packet.getPacketType()) {
327 case Packet.ACTIVEMQ_MSG_ACK : {
328 MessageAck ack = (MessageAck) packet;
329 consumeMessageAck(ack);
330 break;
331 }
332 case Packet.XA_TRANSACTION_INFO : {
333 XATransactionInfo info = (XATransactionInfo) packet;
334 consumeXATransactionInfo(info);
335 receiptRequired=info.isReceiptRequired();
336 break;
337 }
338 case Packet.TRANSACTION_INFO : {
339 TransactionInfo info = (TransactionInfo) packet;
340 consumeTransactionInfo(info);
341 break;
342 }
343 case Packet.CONSUMER_INFO : {
344 ConsumerInfo info = (ConsumerInfo) packet;
345 consumeConsumerInfo(info);
346 break;
347 }
348 case Packet.PRODUCER_INFO : {
349 ProducerInfo info = (ProducerInfo) packet;
350 consumeProducerInfo(info);
351 break;
352 }
353 case Packet.SESSION_INFO : {
354 SessionInfo info = (SessionInfo) packet;
355 consumeSessionInfo(info);
356 break;
357 }
358 case Packet.ACTIVEMQ_CONNECTION_INFO : {
359 ConnectionInfo info = (ConnectionInfo) packet;
360 consumeConnectionInfo(info);
361 break;
362 }
363 case Packet.DURABLE_UNSUBSCRIBE : {
364 DurableUnsubscribe ds = (DurableUnsubscribe) packet;
365 brokerConnector.durableUnsubscribe(this, ds);
366 break;
367 }
368 case Packet.CAPACITY_INFO : {
369 CapacityInfo info = (CapacityInfo) packet;
370 consumeCapacityInfo(info);
371 break;
372 }
373 case Packet.CAPACITY_INFO_REQUEST : {
374 updateCapacityInfo(packet.getId());
375 break;
376 }
377 case Packet.ACTIVEMQ_BROKER_INFO : {
378 consumeBrokerInfo((BrokerInfo) packet);
379 break;
380 }
381 case Packet.KEEP_ALIVE : {
382 // Ignore as the packet contains no additional information to consume
383 break;
384 }
385 case Packet.BROKER_ADMIN_COMMAND : {
386 consumeBrokerAdminCommand((BrokerAdminCommand) packet);
387 break;
388 }
389 case Packet.CLEANUP_CONNECTION_INFO : {
390 consumeCleanupConnectionInfo((CleanupConnectionInfo) packet);
391 break;
392 }
393 default : {
394 log.warn("Unknown Packet received: " + packet);
395 break;
396 }
397 }
398 }
399 }
400 catch (Throwable e) {
401 requestEx = e;
402 log.warn("caught exception consuming packet: " + packet, e);
403 failed = true;
404 }
405 if (receiptRequired){
406 sendReceipt(correlationId, requestEx, failed);
407 }
408 }
409 }
410
411 /**
412 * @param cleanupInfo
413 * @throws JMSException
414 */
415 private void consumeCleanupConnectionInfo(CleanupConnectionInfo cleanupInfo) throws JMSException {
416 try {
417
418 for (Iterator i = consumers.iterator(); i.hasNext();) {
419 ConsumerInfo info = (ConsumerInfo) i.next();
420 info.setStarted(false);
421 this.brokerConnector.deregisterMessageConsumer(this, info);
422 }
423 for (Iterator i = producers.iterator(); i.hasNext();) {
424 ProducerInfo info = (ProducerInfo) i.next();
425 info.setStarted(false);
426 this.brokerConnector.deregisterMessageProducer(this, info);
427 }
428 for (Iterator i = sessions.iterator(); i.hasNext();) {
429 SessionInfo info = (SessionInfo) i.next();
430 info.setStarted(false);
431 this.brokerConnector.deregisterSession(this, info);
432 }
433 for (Iterator i = transactions.iterator(); i.hasNext();) {
434 this.brokerConnector.rollbackTransaction(this, i.next().toString());
435 }
436 this.brokerConnector.deregisterClient(this, connectionInfo);
437 registered = false;
438
439 } finally {
440 // whatever happens, lets make sure we unregister & clean things
441 // down
442 if (log.isDebugEnabled()) {
443 log.debug(this + " has stopped");
444 }
445 this.consumers.clear();
446 this.producers.clear();
447 this.transactions.clear();
448 this.sessions.clear();
449 }
450
451 }
452
453 /**
454 * @param command
455 * @throws JMSException
456 */
457 private void consumeBrokerAdminCommand(BrokerAdminCommand command) throws JMSException {
458 BrokerAdmin brokerAdmin = brokerConnector.getBrokerContainer().getBroker().getBrokerAdmin();
459 if (BrokerAdminCommand.CREATE_DESTINATION.equals(command.getCommand())) {
460 brokerAdmin.createMessageContainer(command.getDestination());
461 }
462 else if (BrokerAdminCommand.DESTROY_DESTINATION.equals(command.getCommand())) {
463 brokerAdmin.destoryMessageContainer(command.getDestination());
464 }
465 else if (BrokerAdminCommand.EMPTY_DESTINATION.equals(command.getCommand())) {
466 brokerAdmin.getMessageContainerAdmin(command.getDestination()).empty();
467 }
468 else if (BrokerAdminCommand.SHUTDOWN_SERVER_VM.equals(command.getCommand())) {
469 if (Boolean.getBoolean("enable.vm.shutdown")) {
470 log.info("processing command=" + BrokerAdminCommand.SHUTDOWN_SERVER_VM);
471 System.exit(1);
472 } else
473 {
474 log.warn("ignoring command=" + BrokerAdminCommand.SHUTDOWN_SERVER_VM + ", enable.vm.shutdown=false");
475 }
476 }
477 else {
478 throw new JMSException("Broker Admin Command type: " + command.getCommand() + " not recognized.");
479 }
480 }
481
482 /**
483 * Register/deregister MessageConsumer with the Broker
484 *
485 * @param info
486 * @throws JMSException
487 */
488 public void consumeConsumerInfo(ConsumerInfo info) throws JMSException {
489 String localBrokerName = brokerConnector.getBrokerInfo().getBrokerName();
490 if (info.isStarted()) {
491 consumers.add(info);
492 if (this.activeConsumers.add(info)) {
493 this.brokerConnector.registerMessageConsumer(this, info);
494 }
495 }
496 else {
497 consumers.remove(info);
498 if (activeConsumers.remove(info)) {
499 this.brokerConnector.deregisterMessageConsumer(this, info);
500 }
501 }
502 }
503
504 /**
505 * Update the peer Connection about the Broker's capacity for messages
506 *
507 * @param capacity
508 */
509 public void updateBrokerCapacity(int capacity) {
510 CapacityInfo info = new CapacityInfo();
511 info.setResourceName(this.brokerConnector.getBrokerInfo().getBrokerName());
512 info.setCapacity(capacity);
513 info.setFlowControlTimeout(getFlowControlTimeout(capacity));
514 send(info);
515 }
516
517 /**
518 * register with the Broker
519 *
520 * @param info
521 * @throws JMSException
522 */
523 public void consumeConnectionInfo(ConnectionInfo info) throws JMSException {
524 this.connectionInfo = info;
525 if (info.isClosed()) {
526 try {
527 cleanUp();
528 if (info.isReceiptRequired()){
529 sendReceipt(info.getId(), null, false);
530 }
531 info.setReceiptRequired(false);
532 try {
533 Thread.sleep(500);
534 }
535 catch (Throwable e) {
536 }
537 }
538 finally {
539 close();
540 }
541 }
542 else {
543 if (!registered) {
544 this.brokerConnector.registerClient(this, info);
545 registered = true;
546 }
547 synchronized (started) {
548 //set transport hint
549 if (info.getProperties() != null && info.getProperties().getProperty(ConnectionInfo.NO_DELAY_PROPERTY) != null){
550 boolean noDelay = new Boolean(info.getProperties().getProperty(ConnectionInfo.NO_DELAY_PROPERTY)).booleanValue();
551 channel.setNoDelay(noDelay);
552
553 }
554 if (!started.get() && info.isStarted()) {
555 started.set(true);
556 // Dispatch any queued
557 log.debug(this + " has started running client version " + info.getClientVersion()
558 + " , wire format = " + info.getWireFormatVersion());
559 //go through consumers, producers, and sessions - setting their clientId (which might not have been set)
560 for (Iterator i = consumers.iterator();i.hasNext();) {
561 ConsumerInfo ci = (ConsumerInfo) i.next();
562 ci.setClientId(info.getClientId());
563 }
564 for (Iterator i = producers.iterator();i.hasNext();) {
565 ProducerInfo pi = (ProducerInfo) i.next();
566 pi.setClientId(info.getClientId());
567 }
568 for (Iterator i = sessions.iterator();i.hasNext();) {
569 SessionInfo si = (SessionInfo) i.next();
570 si.setClientId(info.getClientId());
571 }
572 for (int i = 0;i < dispatchQueue.size();i++) {
573 ActiveMQMessage msg = (ActiveMQMessage) dispatchQueue.get(i);
574 dispatch(msg);
575 }
576 dispatchQueue.clear();
577 }
578 if (started.get() && !info.isStarted()) {
579 started.set(false);
580 log.debug(this + " has stopped");
581 }
582 }
583 }
584 }
585
586 /**
587 * start consuming messages
588 *
589 * @throws JMSException
590 */
591 public void start() throws JMSException {
592 channel.start();
593 }
594
595 /**
596 * stop consuming messages
597 *
598 * @throws JMSException
599 */
600 public void stop() throws JMSException {
601 log.trace("Stopping channel: " + channel);
602 channel.stop();
603 }
604
605 /**
606 * cleanup
607 */
608 public synchronized void cleanUp() {
609 // we could be called here from 2 different code paths
610 // based on if we get a transport failure or we do a clean shutdown
611 // so lets only run this stuff once
612 if (!cleanedUp) {
613 cleanedUp = true;
614 try {
615 try {
616 for (Iterator i = consumers.iterator();i.hasNext();) {
617 ConsumerInfo info = (ConsumerInfo) i.next();
618 info.setStarted(false);
619 this.brokerConnector.deregisterMessageConsumer(this, info);
620 }
621 for (Iterator i = producers.iterator();i.hasNext();) {
622 ProducerInfo info = (ProducerInfo) i.next();
623 info.setStarted(false);
624 this.brokerConnector.deregisterMessageProducer(this, info);
625 }
626 for (Iterator i = sessions.iterator();i.hasNext();) {
627 SessionInfo info = (SessionInfo) i.next();
628 info.setStarted(false);
629 this.brokerConnector.deregisterSession(this, info);
630 }
631 for (Iterator i = transactions.iterator();i.hasNext();) {
632 this.brokerConnector.rollbackTransaction(this, i.next().toString());
633 }
634 }
635 finally {
636 // whatever happens, lets make sure we unregister & clean things down
637 if (log.isDebugEnabled()) {
638 log.debug(this + " has stopped");
639 }
640 this.consumers.clear();
641 this.producers.clear();
642 this.transactions.clear();
643 this.sessions.clear();
644 this.brokerConnector.deregisterClient(this, connectionInfo);
645 registered = false;
646 }
647 }
648 catch (JMSException e) {
649 log.warn("failed to de-register Broker client: " + e, e);
650 }
651 }
652 else {
653 log.debug("We are ignoring a duplicate cleanup() method called for: " + this);
654 }
655 }
656
657 // Implementation methods
658 //-------------------------------------------------------------------------
659 protected void send(Packet packet) {
660 if (!closed.get()) {
661 try {
662 if (brokerConnection) {
663 String brokerName = brokerConnector.getBrokerContainer().getBroker().getBrokerName();
664 packet.addBrokerVisited(brokerName);
665 if (packet.hasVisited(remoteBrokerName)) {
666 if (log.isDebugEnabled()) {
667 log.debug("Not Forwarding: " + remoteBrokerName + " has already been visited by packet: "
668 + packet);
669 }
670 return;
671 }
672 }
673 packet.setId(this.packetIdGenerator.getNextShortSequence());
674 if( commandLog.isDebugEnabled() )
675 commandLog.debug("broker for "+getClientID()+" sending: "+packet);
676 this.channel.asyncSend(packet);
677 }
678 catch (JMSException e) {
679 log.warn(this + " caught exception ", e);
680 close();
681 }
682 }
683 }
684
685 /**
686 * validate the connection
687 * @param timeout
688 * @throws JMSException
689 */
690 public void validateConnection(int timeout) throws JMSException {
691 KeepAlive packet = new KeepAlive();
692 packet.setReceiptRequired(true);
693 packet.setId(this.packetIdGenerator.getNextShortSequence());
694 // In most cases, if the transport is dead due to network errors
695 // the network error will be recognised immediately and an exception
696 // thrown. If the duplicate client ids are due to misconfiguration,
697 // we make sure that we do not terminate the "right" connection
698 // prematurely by using a long timeout here. If the existing client
699 // is working heavily and/or over a slow link, it might take some time
700 // for it to respond. In such a case, the new client is misconfigured
701 // and can wait for a while before being kicked out.
702
703 Receipt r = getChannel().send(packet, timeout);
704 if (r == null) throw new JMSException("Client did not respond in time");
705
706 }
707
708 protected void close() {
709 if (closed.commit(false, true)) {
710 this.channel.stop();
711 log.debug(this + " has closed");
712 }
713 }
714
715 /**
716 * Send message to Broker
717 *
718 * @param message
719 * @throws JMSException
720 */
721 private void consumeActiveMQMessage(ActiveMQMessage message) throws JMSException {
722 this.brokerConnector.sendMessage(this, message);
723 }
724
725 /**
726 * Send Message acknowledge to the Broker
727 *
728 * @param ack
729 * @throws JMSException
730 */
731 private void consumeMessageAck(MessageAck ack) throws JMSException {
732 this.brokerConnector.acknowledgeMessage(this, ack);
733 }
734
735 /**
736 * Handle transaction start/commit/rollback
737 *
738 * @param info
739 * @throws JMSException
740 */
741 private void consumeTransactionInfo(TransactionInfo info) throws JMSException {
742 if (info.getType() == TransactionInfo.START) {
743 transactions.add(info.getTransactionId());
744 this.brokerConnector.startTransaction(this, info.getTransactionId());
745 }
746 else {
747 if (info.getType() == TransactionInfo.ROLLBACK) {
748 this.brokerConnector.rollbackTransaction(this, info.getTransactionId());
749 }
750 else if (info.getType() == TransactionInfo.COMMIT) {
751 this.brokerConnector.commitTransaction(this, info.getTransactionId());
752 }
753 transactions.remove(info.getTransactionId());
754 }
755 }
756
757 /**
758 * Handle XA transaction start/prepare/commit/rollback
759 *
760 * @param info
761 * @throws JMSException
762 * @throws XAException
763 */
764 private void consumeXATransactionInfo(XATransactionInfo info) throws JMSException, XAException {
765 if (info.getType() == XATransactionInfo.START) {
766 this.brokerConnector.startTransaction(this, info.getXid());
767 }
768 else if (info.getType() == XATransactionInfo.XA_RECOVER) {
769 ActiveMQXid rc[] = this.brokerConnector.getPreparedTransactions(this);
770 if( info.isReceiptRequired()) {
771 // We will be sending our own receipt..
772 info.setReceiptRequired(false);
773 // Send the receipt..
774 ResponseReceipt receipt = new ResponseReceipt();
775 receipt.setCorrelationId(info.getId());
776 receipt.setResult(rc);
777 send(receipt);
778 }
779 }
780 else if (info.getType() == XATransactionInfo.GET_RM_ID) {
781 String rc = this.brokerConnector.getResourceManagerId(this);
782 if( info.isReceiptRequired()) {
783 // We will be sending our own receipt..
784 info.setReceiptRequired(false);
785 // Send the receipt..
786 ResponseReceipt receipt = new ResponseReceipt();
787 receipt.setId(this.packetIdGenerator.getNextShortSequence());
788 receipt.setCorrelationId(info.getId());
789 receipt.setResult(rc);
790 send(receipt);
791 }
792 }
793 else if (info.getType() == XATransactionInfo.END) {
794 // we don't do anything..
795 }
796 else {
797 if (info.getType() == XATransactionInfo.PRE_COMMIT) {
798 int rc = this.brokerConnector.prepareTransaction(this, info.getXid());
799 // We will be sending our own receipt..
800 if( info.isReceiptRequired()) {
801 info.setReceiptRequired(false);
802 // Send the receipt..
803 IntResponseReceipt receipt = new IntResponseReceipt();
804 receipt.setId(this.packetIdGenerator.getNextShortSequence());
805 receipt.setCorrelationId(info.getId());
806 receipt.setResult(rc);
807 send(receipt);
808 }
809 }
810 else if (info.getType() == XATransactionInfo.ROLLBACK) {
811 this.brokerConnector.rollbackTransaction(this, info.getXid());
812 }
813 else if (info.getType() == XATransactionInfo.COMMIT_ONE_PHASE) {
814 this.brokerConnector.commitTransaction(this, info.getXid(), true);
815 }
816 else if (info.getType() == XATransactionInfo.COMMIT) {
817 this.brokerConnector.commitTransaction(this, info.getXid(), false);
818 }
819 else {
820 throw new JMSException("Packet type: " + info.getType() + " not recognized.");
821 }
822 }
823 }
824
825 /**
826 * register/deregister MessageProducer in the Broker
827 *
828 * @param info
829 * @throws JMSException
830 */
831 private void consumeProducerInfo(ProducerInfo info) throws JMSException {
832 if (info.isStarted()) {
833 producers.add(info);
834 this.brokerConnector.registerMessageProducer(this, info);
835 }
836 else {
837 producers.remove(info);
838 this.brokerConnector.deregisterMessageProducer(this, info);
839 }
840 }
841
842 /**
843 * register/deregister Session in a Broker
844 *
845 * @param info
846 * @throws JMSException
847 */
848 private void consumeSessionInfo(SessionInfo info) throws JMSException {
849 if (info.isStarted()) {
850 sessions.add(info);
851 this.brokerConnector.registerSession(this, info);
852 }
853 else {
854 sessions.remove(info);
855 this.brokerConnector.deregisterSession(this, info);
856 }
857 }
858
859 /**
860 * Update capacity for the peer
861 *
862 * @param info
863 */
864 private void consumeCapacityInfo(CapacityInfo info) {
865 this.capacity = info.getCapacity();
866 }
867
868 private void updateCapacityInfo(short correlationId) {
869 CapacityInfo info = new CapacityInfo();
870 info.setResourceName(this.brokerConnector.getBrokerInfo().getBrokerName());
871 info.setCorrelationId(correlationId);
872 info.setCapacity(this.brokerConnector.getBrokerCapacity());
873 info.setFlowControlTimeout(getFlowControlTimeout(info.getCapacity()));
874 send(info);
875 }
876
877 private long getFlowControlTimeout(int capacity) {
878 long result = -1;
879 if (capacity <= 0) {
880 result = 10000;
881 }
882 else if (capacity <= 10) {
883 result = 1000;
884 }
885 else if (capacity <= 20) {
886 result = 10;
887 }
888 return result;
889 }
890
891 private void consumeBrokerInfo(final BrokerInfo info) {
892 brokerConnection = true;
893 started.set(true);
894 remoteBrokerName = info.getBrokerName();
895 if (remoteBrokerName == null || remoteBrokerName.length() == 0) {
896 log.warn("No remote broker name available!");
897 }
898 else {
899 if (log.isDebugEnabled()) {
900 log.debug("Received broker info from: " + remoteBrokerName + " on client: " + channel);
901 }
902 }
903 String clusterName = getBrokerConnector().getBrokerContainer().getBroker().getBrokerClusterName();
904 if (clusterName.equals(info.getClusterName())) {
905 clusteredConnection = true;
906 }
907 if (!remoteNetworkConnector && info.isRemote()) {
908 try {
909 final NetworkConnector networkConnector = new NetworkConnector(brokerConnector.getBrokerContainer());
910 networkConnector.getThreadPool().execute(new Runnable() {
911 public void run() {
912 try {
913 NetworkChannel networkChannel = new NetworkChannel(networkConnector, brokerConnector
914 .getBrokerContainer(), channel, info.getBrokerName(), info.getClusterName());
915 networkConnector.addNetworkChannel(networkChannel);
916 brokerConnector.getBrokerContainer().addNetworkConnector(networkConnector);
917 networkConnector.start();
918 }
919 catch (JMSException e) {
920 log.error("Failed to create reverse remote channel", e);
921 }
922 }
923 });
924 log.info("Started reverse remote channel to " + remoteBrokerName);
925 remoteNetworkConnector = true;
926 }
927 catch (InterruptedException e) {
928 log.error("Failed to create reverse remote channel", e);
929 }
930 }
931 }
932
933
934 private void sendReceipt(short correlationId, Throwable requestEx, boolean failed) {
935 Receipt receipt = new Receipt();
936 receipt.setCorrelationId(correlationId);
937 receipt.setBrokerName(brokerConnector.getBrokerInfo().getBrokerName());
938 receipt.setClusterName(brokerConnector.getBrokerInfo().getClusterName());
939 receipt.setException(requestEx);
940 receipt.setFailed(failed);
941 send(receipt);
942 }
943
944 /**
945 * @param subject
946 */
947 public void setSubject(Subject subject) {
948 this.subject = subject;
949 }
950
951 /**
952 * @return the subject
953 */
954 public Subject getSubject() {
955 return subject;
956 }
957 }