001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.service.boundedvm;
020 import java.util.List;
021
022 import javax.jms.DeliveryMode;
023 import javax.jms.JMSException;
024
025 import org.activemq.broker.BrokerClient;
026 import org.activemq.filter.Filter;
027 import org.activemq.io.util.MemoryBoundedQueue;
028 import org.activemq.message.ActiveMQMessage;
029 import org.activemq.message.ConsumerInfo;
030
031 /**
032 * A holder for Transient Queue consumer info and message routing
033 *
034 * @version $Revision: 1.1.1.1 $
035 */
036 public class TransientQueueSubscription extends TransientSubscription {
037
038 private MemoryBoundedQueue dispatchedQueue;
039 private MemoryBoundedQueue ackedQueue; // Where messages go that are acked in a transaction
040
041 /**
042 * Construct the TransientQueueSubscription
043 *
044 * @param client
045 * @param dispatchedQueue
046 * @param ackQueue
047 * @param filter
048 * @param info
049 */
050 public TransientQueueSubscription(BrokerClient client, MemoryBoundedQueue dispatchedQueue, MemoryBoundedQueue ackQueue, Filter filter,
051 ConsumerInfo info) {
052 super(filter, info, client);
053 this.dispatchedQueue = dispatchedQueue;
054 this.ackedQueue = ackQueue;
055 }
056
057 /**
058 * determines if the Subscription is interested in the message
059 *
060 * @param message
061 * @return true if this Subscription will accept the message
062 * @throws JMSException
063 */
064 public boolean isTarget(ActiveMQMessage message) throws JMSException {
065 boolean result = false;
066 if (message != null) {
067 //make sure we don't loop messages around the cluster
068 if (!client.isClusteredConnection() || !message.isEntryCluster(clusterName)
069 || message.isEntryBroker(brokerName)) {
070 result = filter.matches(message)
071 && (message.getJMSDeliveryMode() == DeliveryMode.NON_PERSISTENT || consumerInfo
072 .getDestination().isTemporary());
073 }
074 }
075 return result;
076 }
077
078 /**
079 * @return true if the consumer has capacity for more messages
080 */
081 public boolean canAcceptMessages() {
082 return dispatchedQueue.size() <= consumerInfo.getPrefetchNumber();
083 }
084
085 /**
086 * Dispatch a message to the Consumer
087 *
088 * @param message
089 * @throws JMSException
090 */
091 public void doDispatch(ActiveMQMessage message) throws JMSException {
092 dispatchedQueue.enqueueNoBlock(message);
093 message = message.shallowCopy();
094 message.setConsumerNos(new int[]{consumerInfo.getConsumerNo()});
095 client.dispatch(message);
096 }
097
098
099 /**
100 * Acknowledge the receipt of a message by a consumer
101 *
102 * @param id
103 * @return the removed ActiveMQMessage with the associated id
104 */
105 public ActiveMQMessage acknowledgeMessage(String id) {
106 ActiveMQMessage msg = (ActiveMQMessage) dispatchedQueue.remove(id);
107 return msg;
108 }
109
110 /**
111 * @return all the unacknowledge messages
112 */
113 public List getUndeliveredMessages() {
114 return dispatchedQueue.getContents();
115 }
116
117 /**
118 * close the subscription
119 */
120 public void close() {
121 super.close();
122 dispatchedQueue.close();
123 ackedQueue.close();
124 }
125
126 /**
127 * Add an acked message.
128 */
129 public boolean hasAckedMessage() {
130 return !ackedQueue.isEmpty();
131 }
132
133 /**
134 * Add an acked message.
135 *
136 * @throws InterruptedException
137 */
138 public void addAckedMessage(ActiveMQMessage message) {
139 ackedQueue.enqueueNoBlock(message);
140 }
141
142 /**
143 * Get a list of all the acked messages
144 */
145 public List listAckedMessages() {
146 return ackedQueue.getContents();
147 }
148
149 /**
150 * Add an acked message.
151 */
152 public void removeAllAckedMessages() {
153 ackedQueue.clear();
154 }
155
156 public boolean isBrowser() {
157 return consumerInfo.isBrowser();
158 }
159 }