001 /**
002 *
003 * Copyright 2005 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.pool;
019
020 import org.activemq.ActiveMQMessageProducer;
021 import org.activemq.ActiveMQQueueSender;
022 import org.activemq.ActiveMQSession;
023 import org.activemq.ActiveMQTopicPublisher;
024 import org.activemq.AlreadyClosedException;
025 import org.activemq.util.JMSExceptionHelper;
026 import org.apache.commons.logging.Log;
027 import org.apache.commons.logging.LogFactory;
028 import org.apache.commons.pool.ObjectPool;
029
030 import javax.jms.*;
031 import java.io.Serializable;
032
033 /**
034 * @version $Revision: 1.1 $
035 */
036 public class PooledSession implements TopicSession, QueueSession {
037 private static final transient Log log = LogFactory.getLog(PooledSession.class);
038
039 private ActiveMQSession session;
040 private ObjectPool sessionPool;
041 private ActiveMQMessageProducer messageProducer;
042 private ActiveMQQueueSender queueSender;
043 private ActiveMQTopicPublisher topicPublisher;
044 private boolean transactional = true;
045
046 public PooledSession(ActiveMQSession aSession, ObjectPool sessionPool) {
047 this.session = aSession;
048 this.sessionPool = sessionPool;
049 this.transactional = session.isTransacted();
050 }
051
052
053 public void close() throws JMSException {
054 // TODO a cleaner way to reset??
055
056 // lets reset the session
057 getSession().setMessageListener(null);
058
059 // maybe do a rollback?
060 if (transactional) {
061 try {
062 getSession().rollback();
063 }
064 catch (JMSException e) {
065 log.warn("Caught exception trying rollback() when putting session back into the pool: " + e, e);
066
067 // lets close the session and not put the session back into the pool
068 try {
069 session.close();
070 }
071 catch (JMSException e1) {
072 log.trace("Ignoring exception as discarding session: " + e1, e1);
073 }
074 session = null;
075 return;
076 }
077 }
078
079 try {
080 sessionPool.returnObject(this);
081 }
082 catch (Exception e) {
083 throw JMSExceptionHelper.newJMSException("Failed to return session to pool: " + e, e);
084 }
085 }
086
087 public void commit() throws JMSException {
088 getSession().commit();
089 }
090
091 public BytesMessage createBytesMessage() throws JMSException {
092 return getSession().createBytesMessage();
093 }
094
095 public MapMessage createMapMessage() throws JMSException {
096 return getSession().createMapMessage();
097 }
098
099 public Message createMessage() throws JMSException {
100 return getSession().createMessage();
101 }
102
103 public ObjectMessage createObjectMessage() throws JMSException {
104 return getSession().createObjectMessage();
105 }
106
107 public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
108 return getSession().createObjectMessage(serializable);
109 }
110
111 public Queue createQueue(String s) throws JMSException {
112 return getSession().createQueue(s);
113 }
114
115 public StreamMessage createStreamMessage() throws JMSException {
116 return getSession().createStreamMessage();
117 }
118
119 public TemporaryQueue createTemporaryQueue() throws JMSException {
120 return getSession().createTemporaryQueue();
121 }
122
123 public TemporaryTopic createTemporaryTopic() throws JMSException {
124 return getSession().createTemporaryTopic();
125 }
126
127 public void unsubscribe(String s) throws JMSException {
128 getSession().unsubscribe(s);
129 }
130
131 public TextMessage createTextMessage() throws JMSException {
132 return getSession().createTextMessage();
133 }
134
135 public TextMessage createTextMessage(String s) throws JMSException {
136 return getSession().createTextMessage(s);
137 }
138
139 public Topic createTopic(String s) throws JMSException {
140 return getSession().createTopic(s);
141 }
142
143 public int getAcknowledgeMode() throws JMSException {
144 return getSession().getAcknowledgeMode();
145 }
146
147 public boolean getTransacted() throws JMSException {
148 return getSession().getTransacted();
149 }
150
151 public void recover() throws JMSException {
152 getSession().recover();
153 }
154
155 public void rollback() throws JMSException {
156 getSession().rollback();
157 }
158
159 public void run() {
160 if (session != null) {
161 session.run();
162 }
163 }
164
165
166 // Consumer related methods
167 //-------------------------------------------------------------------------
168 public QueueBrowser createBrowser(Queue queue) throws JMSException {
169 return getSession().createBrowser(queue);
170 }
171
172 public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException {
173 return getSession().createBrowser(queue, selector);
174 }
175
176 public MessageConsumer createConsumer(Destination destination) throws JMSException {
177 return getSession().createConsumer(destination);
178 }
179
180 public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
181 return getSession().createConsumer(destination, selector);
182 }
183
184 public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
185 return getSession().createConsumer(destination, selector, noLocal);
186 }
187
188 public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException {
189 return getSession().createDurableSubscriber(topic, selector);
190 }
191
192 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException {
193 return getSession().createDurableSubscriber(topic, name, selector, noLocal);
194 }
195
196 public MessageListener getMessageListener() throws JMSException {
197 return getSession().getMessageListener();
198 }
199
200 public void setMessageListener(MessageListener messageListener) throws JMSException {
201 getSession().setMessageListener(messageListener);
202 }
203
204 public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
205 return getSession().createSubscriber(topic);
206 }
207
208 public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
209 return getSession().createSubscriber(topic, selector, local);
210 }
211
212 public QueueReceiver createReceiver(Queue queue) throws JMSException {
213 return getSession().createReceiver(queue);
214 }
215
216 public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
217 return getSession().createReceiver(queue, selector);
218 }
219
220
221 // Producer related methods
222 //-------------------------------------------------------------------------
223 public MessageProducer createProducer(Destination destination) throws JMSException {
224 return new PooledProducer(getMessageProducer(), destination);
225 }
226
227 public QueueSender createSender(Queue queue) throws JMSException {
228 return new PooledQueueSender(getQueueSender(), queue);
229 }
230
231 public TopicPublisher createPublisher(Topic topic) throws JMSException {
232 return new PooledTopicPublisher(getTopicPublisher(), topic);
233 }
234
235 // Implementation methods
236 //-------------------------------------------------------------------------
237 protected ActiveMQSession getSession() throws AlreadyClosedException {
238 if (session == null) {
239 throw new AlreadyClosedException("The session has already been closed");
240 }
241 return session;
242 }
243
244 public ActiveMQMessageProducer getMessageProducer() throws JMSException {
245 if (messageProducer == null) {
246 messageProducer = (ActiveMQMessageProducer) getSession().createProducer(null);
247 }
248 return messageProducer;
249 }
250
251 public ActiveMQQueueSender getQueueSender() throws JMSException {
252 if (queueSender == null) {
253 queueSender = (ActiveMQQueueSender) getSession().createSender(null);
254 }
255 return queueSender;
256 }
257
258 public ActiveMQTopicPublisher getTopicPublisher() throws JMSException {
259 if (topicPublisher == null) {
260 topicPublisher = (ActiveMQTopicPublisher) getSession().createPublisher(null);
261 }
262 return topicPublisher;
263 }
264
265 }