001 /**
002 *
003 * Copyright 2005 Hiram Chirino
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.advisories;
019
020 import javax.jms.Connection;
021 import javax.jms.Destination;
022 import javax.jms.JMSException;
023 import javax.jms.Message;
024 import javax.jms.MessageConsumer;
025 import javax.jms.MessageListener;
026 import javax.jms.ObjectMessage;
027 import javax.jms.Session;
028
029 import org.activemq.message.ActiveMQDestination;
030 import org.activemq.message.ConsumerInfo;
031 import org.apache.commons.logging.Log;
032 import org.apache.commons.logging.LogFactory;
033
034 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
035
036 /**
037 * A ProducerDemandAdvisor is used to know when a destination is in demand.
038 *
039 * Sometimes generating messages to send to a destination is very expensive
040 * and the application would like to avoid producing messages if there are no
041 * active consumers for the destination. There is a "demand" for messages
042 * when a consumer does come active.
043 *
044 * This object uses Advisory messages to know when consumer go active and
045 * inactive.
046 */
047 public class ProducerDemandAdvisor {
048
049 private static final Log log = LogFactory.getLog(ProducerDemandAdvisor.class);
050
051 private final ActiveMQDestination destination;
052 private Connection connection;
053 private Session session;
054 private SynchronizedBoolean started = new SynchronizedBoolean(false);
055 private int consumerCount;
056 private ProducerDemandListener demandListener;
057
058 public ProducerDemandAdvisor( Connection connection, final Destination destination ) throws JMSException {
059 this.connection = connection;
060 this.destination = ActiveMQDestination.transformDestination(destination);
061 }
062
063 /**
064 * @param destination
065 */
066 private void fireDemandEvent() {
067 demandListener.onEvent( new ProducerDemandEvent(destination, isInDemand()));
068 }
069
070 public boolean isInDemand() {
071 return consumerCount>0;
072 }
073
074 public ProducerDemandListener getDemandListener() {
075 return demandListener;
076 }
077
078 synchronized public void setDemandListener(ProducerDemandListener demandListener) {
079 this.demandListener = demandListener;
080 fireDemandEvent();
081 }
082
083 public void start() throws JMSException {
084 if (started.commit(false, true)) {
085 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
086 MessageConsumer consumer = session.createConsumer(destination.getTopicForConsumerAdvisory());
087 consumer.setMessageListener(new MessageListener(){
088 public void onMessage(Message msg) {
089 process(msg);
090 }
091 });
092 }
093 }
094
095 public void stop() throws JMSException {
096 if (started.commit(true, false)) {
097 if (session != null) {
098 session.close();
099 }
100 }
101 }
102
103 protected void process(Message msg) {
104 if (msg instanceof ObjectMessage) {
105 try {
106 ConsumerInfo info = (ConsumerInfo) ((ObjectMessage) msg).getObject();
107 ConsumerAdvisoryEvent event = new ConsumerAdvisoryEvent(info);
108
109
110 boolean inDemand = isInDemand();
111
112 if ( info.isStarted() ) {
113 consumerCount++;
114 } else {
115 consumerCount--;
116 }
117
118 // Notify listener if there was a change in demand.
119 if (inDemand ^ isInDemand() && demandListener != null) {
120 fireDemandEvent();
121 }
122 } catch (JMSException e) {
123 log.error("Failed to process message: " + msg);
124 }
125 }
126 }
127
128 }