001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq;
020
021 import javax.jms.ConnectionConsumer;
022 import javax.jms.IllegalStateException;
023 import javax.jms.JMSException;
024 import javax.jms.ServerSession;
025 import javax.jms.ServerSessionPool;
026 import javax.jms.Session;
027
028 import org.activemq.io.util.MemoryBoundedQueue;
029 import org.activemq.message.ActiveMQMessage;
030 import org.activemq.message.ConsumerInfo;
031
032 /**
033 * For application servers, <CODE>Connection</CODE> objects provide a special
034 * facility for creating a <CODE>ConnectionConsumer</CODE> (optional). The
035 * messages it is to consume are specified by a <CODE>Destination</CODE> and a
036 * message selector. In addition, a <CODE>ConnectionConsumer</CODE> must be
037 * given a <CODE>ServerSessionPool</CODE> to use for processing its messages.
038 * <p/>
039 * <P>
040 * Normally, when traffic is light, a <CODE>ConnectionConsumer</CODE> gets a
041 * <CODE>ServerSession</CODE> from its pool, loads it with a single message,
042 * and starts it. As traffic picks up, messages can back up. If this happens, a
043 * <CODE>ConnectionConsumer</CODE> can load each <CODE>ServerSession</CODE>
044 * with more than one message. This reduces the thread context switches and
045 * minimizes resource use at the expense of some serialization of message
046 * processing.
047 *
048 * @see javax.jms.Connection#createConnectionConsumer
049 * @see javax.jms.Connection#createDurableConnectionConsumer
050 * @see javax.jms.QueueConnection#createConnectionConsumer
051 * @see javax.jms.TopicConnection#createConnectionConsumer
052 * @see javax.jms.TopicConnection#createDurableConnectionConsumer
053 */
054
055 public class ActiveMQConnectionConsumer implements ConnectionConsumer, ActiveMQMessageDispatcher {
056
057 private ActiveMQConnection connection;
058
059 private ServerSessionPool sessionPool;
060
061 private ConsumerInfo consumerInfo;
062
063 private boolean closed;
064
065 protected MemoryBoundedQueue messageQueue;
066
067 /**
068 * Create a ConnectionConsumer
069 *
070 * @param theConnection
071 * @param theSessionPool
072 * @param theConsumerInfo
073 * @param theMaximumMessages
074 * @throws JMSException
075 */
076 protected ActiveMQConnectionConsumer(ActiveMQConnection theConnection, ServerSessionPool theSessionPool,
077 ConsumerInfo theConsumerInfo, int theMaximumMessages) throws JMSException {
078 this.connection = theConnection;
079 this.sessionPool = theSessionPool;
080 this.consumerInfo = theConsumerInfo;
081 this.connection.addConnectionConsumer(this);
082 this.consumerInfo.setStarted(true);
083 this.consumerInfo.setPrefetchNumber(theMaximumMessages);
084 this.connection.syncSendPacket(this.consumerInfo);
085
086 String queueName = connection.clientID + ":" + theConsumerInfo.getConsumerName() + ":"
087 + theConsumerInfo.getConsumerNo();
088 this.messageQueue = connection.getMemoryBoundedQueue(queueName);
089 }
090
091 /**
092 * Tests to see if the Message Dispatcher is a target for this message
093 *
094 * @param message
095 * the message to test
096 * @return true if the Message Dispatcher can dispatch the message
097 */
098 public boolean isTarget(ActiveMQMessage message) {
099 return message.isConsumerTarget(this.consumerInfo.getConsumerNo());
100 }
101
102 /**
103 * Dispatch an ActiveMQMessage
104 *
105 * @param message
106 */
107 public void dispatch(ActiveMQMessage message) {
108 if (message.isConsumerTarget(this.consumerInfo.getConsumerNo())) {
109 message.setConsumerIdentifer(this.consumerInfo.getConsumerId());
110 message.setTransientConsumed(!this.consumerInfo.isDurableTopic()
111 && !this.consumerInfo.getDestination().isQueue());
112 try {
113 if (sessionPool != null)
114 dispatchToSession(message);
115 else
116 dispatchToQueue(message);
117 } catch (JMSException jmsEx) {
118 this.connection.handleAsyncException(jmsEx);
119 }
120 }
121 }
122
123 /**
124 * @param message
125 * @throws JMSException
126 */
127 private void dispatchToQueue(ActiveMQMessage message) throws JMSException {
128 messageQueue.enqueue(message);
129 }
130
131 /**
132 * Receives the next message that arrives within the specified timeout
133 * interval.
134 *
135 * @throws JMSException
136 */
137 public ActiveMQMessage receive(long timeout) throws JMSException {
138 try {
139 ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue(timeout);
140 return message;
141 } catch (InterruptedException ioe) {
142 return null;
143 }
144 }
145
146 /**
147 * @param message
148 * @throws JMSException
149 */
150 private void dispatchToSession(ActiveMQMessage message) throws JMSException {
151
152 ServerSession serverSession = sessionPool.getServerSession();
153 Session nestedSession = serverSession.getSession();
154 ActiveMQSession session = null;
155 if (nestedSession instanceof ActiveMQSession) {
156 session = (ActiveMQSession) nestedSession;
157 } else if (nestedSession instanceof ActiveMQTopicSession) {
158 ActiveMQTopicSession topicSession = (ActiveMQTopicSession) nestedSession;
159 session = (ActiveMQSession) topicSession.getNext();
160 } else if (nestedSession instanceof ActiveMQQueueSession) {
161 ActiveMQQueueSession queueSession = (ActiveMQQueueSession) nestedSession;
162 session = (ActiveMQSession) queueSession.getNext();
163 } else {
164 throw new JMSException("Invalid instance of session obtained from server session." +
165 "The instance should be one of the following: ActiveMQSession, ActiveMQTopicSession, ActiveMQQueueSession. " +
166 "Found instance of " + nestedSession.getClass().getName());
167 }
168 session.dispatch(message);
169 serverSession.start();
170 }
171
172 /**
173 * Gets the server session pool associated with this connection consumer.
174 *
175 * @return the server session pool used by this connection consumer
176 * @throws JMSException
177 * if the JMS provider fails to get the server session pool
178 * associated with this consumer due to some internal error.
179 */
180
181 public ServerSessionPool getServerSessionPool() throws JMSException {
182 if (closed) {
183 throw new IllegalStateException("The Connection Consumer is closed");
184 }
185 return this.sessionPool;
186 }
187
188 /**
189 * Closes the connection consumer. <p/>
190 * <P>
191 * Since a provider may allocate some resources on behalf of a connection
192 * consumer outside the Java virtual machine, clients should close these
193 * resources when they are not needed. Relying on garbage collection to
194 * eventually reclaim these resources may not be timely enough.
195 *
196 * @throws JMSException
197 */
198
199 public void close() throws JMSException {
200 if (!closed) {
201 closed = true;
202 this.consumerInfo.setStarted(false);
203 this.connection.asyncSendPacket(this.consumerInfo);
204 this.connection.removeConnectionConsumer(this);
205 }
206
207 }
208 }