001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq;
020
021 import java.util.Enumeration;
022
023 import javax.jms.IllegalStateException;
024 import javax.jms.JMSException;
025 import javax.jms.Queue;
026 import javax.jms.QueueBrowser;
027
028 import org.activemq.message.ActiveMQQueue;
029
030 /**
031 * A client uses a <CODE>QueueBrowser</CODE> object to look at messages on a
032 * queue without removing them.
033 * <p/>
034 * <P>
035 * The <CODE>getEnumeration</CODE> method returns a <CODE>
036 * java.util.Enumeration</CODE> that is used to scan the queue's messages. It
037 * may be an enumeration of the entire content of a queue, or it may contain
038 * only the messages matching a message selector.
039 * <p/>
040 * <P>
041 * Messages may be arriving and expiring while the scan is done. The JMS API
042 * does not require the content of an enumeration to be a static snapshot of
043 * queue content. Whether these changes are visible or not depends on the JMS
044 * provider.
045 * <p/>
046 * <P>
047 * A <CODE>QueueBrowser</CODE> can be created from either a <CODE>Session
048 * </CODE> or a <CODE>QueueSession</CODE>.
049 *
050 * @see javax.jms.Session#createBrowser
051 * @see javax.jms.QueueSession#createBrowser
052 * @see javax.jms.QueueBrowser
053 * @see javax.jms.QueueReceiver
054 */
055
056 public class ActiveMQQueueBrowser implements
057 QueueBrowser, Enumeration {
058
059 private final ActiveMQSession session;
060 private final ActiveMQQueue destination;
061 private final String selector;
062 private final int cnum;
063
064 private ActiveMQMessageConsumer consumer;
065 private boolean closed;
066
067 /**
068 * Constructor for an ActiveMQQueueBrowser - used internally
069 *
070 * @param theSession
071 * @param dest
072 * @param selector
073 * @param cnum
074 * @throws JMSException
075 */
076 protected ActiveMQQueueBrowser(ActiveMQSession session, ActiveMQQueue destination, String selector, int cnum) throws JMSException {
077 this.session = session;
078 this.destination = destination;
079 this.selector = selector;
080 this.cnum = cnum;
081 consumer = createConsumer();
082 }
083
084 /**
085 * @param session
086 * @param destination
087 * @param selector
088 * @param cnum
089 * @return
090 * @throws JMSException
091 */
092 private ActiveMQMessageConsumer createConsumer() throws JMSException {
093 return new ActiveMQMessageConsumer(session, destination, "", selector, cnum, session.connection.getPrefetchPolicy().getQueueBrowserPrefetch(), false, true);
094 }
095
096 private void destroyConsumer() {
097 if( consumer == null )
098 return;
099
100 try {
101 consumer.close();
102 consumer=null;
103 } catch (JMSException e) {
104 e.printStackTrace();
105 }
106 }
107
108 /**
109 * Gets an enumeration for browsing the current queue messages in the order
110 * they would be received.
111 *
112 * @return an enumeration for browsing the messages
113 * @throws JMSException if the JMS provider fails to get the enumeration for this
114 * browser due to some internal error.
115 */
116
117 public Enumeration getEnumeration() throws JMSException {
118 checkClosed();
119
120 if( consumer==null )
121 consumer = createConsumer();
122
123 //ok - started browsing - wait for inbound messages
124 if (consumer.messageQueue.size() == 0) {
125 try {
126 Thread.sleep(1000);
127 }
128 catch (InterruptedException e) {
129 }
130 }
131 return this;
132 }
133
134 private void checkClosed() throws IllegalStateException {
135 if (closed) {
136 throw new IllegalStateException("The Consumer is closed");
137 }
138 }
139
140 /**
141 * @return true if more messages to process
142 */
143 public boolean hasMoreElements() {
144 if( consumer==null )
145 return false;
146
147 boolean rc = consumer.messageQueue.size() > 0;
148 if( !rc ) {
149 destroyConsumer();
150 }
151 return rc;
152 }
153
154
155 /**
156 * @return the next message
157 */
158 public Object nextElement() {
159 if( consumer == null )
160 return null;
161
162 Object answer = null;
163 try {
164 answer = consumer.receiveNoWait();
165 if( answer==null ) {
166 destroyConsumer();
167 }
168 }
169 catch (JMSException e) {
170 e.printStackTrace();
171 }
172 return answer;
173 }
174
175 public void close() throws JMSException {
176 destroyConsumer();
177 closed=true;
178 }
179
180 /**
181 * Gets the queue associated with this queue browser.
182 *
183 * @return the queue
184 * @throws JMSException if the JMS provider fails to get the queue associated
185 * with this browser due to some internal error.
186 */
187
188 public Queue getQueue() throws JMSException {
189 return destination;
190 }
191
192
193 public String getMessageSelector() throws JMSException {
194 return selector;
195 }
196
197
198 }