001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.service.boundedvm;
020
021 import java.util.Collections;
022 import java.util.Iterator;
023 import java.util.Map;
024 import java.util.Set;
025 import java.util.HashMap;
026 import javax.jms.JMSException;
027 import org.activemq.broker.BrokerClient;
028 import org.activemq.filter.AndFilter;
029 import org.activemq.filter.DestinationMap;
030 import org.activemq.filter.Filter;
031 import org.activemq.filter.FilterFactory;
032 import org.activemq.filter.FilterFactoryImpl;
033 import org.activemq.filter.NoLocalFilter;
034 import org.activemq.io.util.MemoryBoundedQueue;
035 import org.activemq.io.util.MemoryBoundedQueueManager;
036 import org.activemq.message.ActiveMQDestination;
037 import org.activemq.message.ActiveMQMessage;
038 import org.activemq.message.ConsumerInfo;
039 import org.activemq.message.MessageAck;
040 import org.activemq.service.DeadLetterPolicy;
041 import org.activemq.service.MessageContainer;
042 import org.activemq.service.MessageContainerManager;
043 import org.activemq.service.TransactionManager;
044 import org.activemq.service.TransactionTask;
045 import org.activemq.service.impl.AutoCommitTransaction;
046
047 import org.apache.commons.logging.Log;
048 import org.apache.commons.logging.LogFactory;
049 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
050 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
051
052 /**
053 * A MessageContainerManager for transient topics
054 *
055 * @version $Revision: 1.1.1.1 $
056 */
057
058 /**
059 * A manager of MessageContainer instances
060 */
061 public class TransientTopicBoundedMessageManager implements MessageContainerManager {
062 private static final Log log = LogFactory.getLog(TransientTopicBoundedMessageManager.class);
063 private MemoryBoundedQueueManager queueManager;
064 private ConcurrentHashMap containers;
065 private ConcurrentHashMap subscriptions;
066 private DestinationMap destinationMap;
067 private FilterFactory filterFactory;
068 private SynchronizedBoolean started;
069 private Map destinations;
070 private DeadLetterPolicy deadLetterPolicy;
071 private boolean decoupledDispatch = false;
072
073 /**
074 * Constructor for TransientTopicBoundedMessageManager
075 *
076 * @param mgr
077 */
078 public TransientTopicBoundedMessageManager(MemoryBoundedQueueManager mgr) {
079 this.queueManager = mgr;
080 this.containers = new ConcurrentHashMap();
081 this.subscriptions = new ConcurrentHashMap();
082 this.destinationMap = new DestinationMap();
083 this.destinations = new ConcurrentHashMap();
084 this.filterFactory = new FilterFactoryImpl();
085 this.started = new SynchronizedBoolean(false);
086 }
087
088 /**
089 * start the manager
090 *
091 * @throws JMSException
092 */
093 public void start() throws JMSException {
094 if (started.commit(false, true)) {
095 for (Iterator i = containers.values().iterator(); i.hasNext();) {
096 TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) i.next();
097 container.start();
098 }
099 }
100 }
101
102 /**
103 * stop the manager
104 *
105 * @throws JMSException
106 */
107 public void stop() throws JMSException {
108 if (started.commit(true, false)) {
109 for (Iterator i = containers.values().iterator(); i.hasNext();) {
110 TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) i.next();
111 container.stop();
112 }
113 }
114 }
115
116 /**
117 * Add a consumer if appropiate
118 *
119 * @param client
120 * @param info
121 * @throws JMSException
122 */
123 public synchronized void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
124 ActiveMQDestination destination = info.getDestination();
125 if (destination.isTopic()) {
126 TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) containers
127 .get(client);
128 if (container == null) {
129 MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(client.toString());
130 container = new TransientTopicBoundedMessageContainer(this, client, queue);
131 containers.put(client, container);
132 if (started.get()) {
133 container.start();
134 }
135 }
136 if (log.isDebugEnabled()) {
137 log.debug("Adding consumer: " + info);
138 }
139
140 TransientTopicSubscription ts = container.addConsumer(createFilter(info), info);
141 if (ts != null) {
142 subscriptions.put(info.getConsumerId(), ts);
143 }
144
145 destinationMap.put(destination,container);
146 String name = destination.getPhysicalName();
147 //As the destinations are used for generating
148 //subscriptions for NetworkConnectors etc.,
149 //we should not generate duplicates by adding in
150 //durable topic subscribers
151 if (!info.isDurableTopic() && !destinations.containsKey(name)) {
152 destinations.put(name, destination);
153 }
154 }
155 }
156
157 /**
158 * @param client
159 * @param info
160 * @throws JMSException
161 */
162 public synchronized void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
163 ActiveMQDestination destination = info.getDestination();
164 if (destination.isTopic()) {
165 TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) containers
166 .get(client);
167 if (container != null) {
168 container.removeConsumer(info);
169 if (container.isInactive()) {
170 containers.remove(client);
171 container.close();
172 destinationMap.remove(destination, container);
173 }
174
175 // lets check if we've no more consumers for this destination
176 //As the destinations are used for generating
177 //subscriptions for NetworkConnectors etc.,
178 //we should not count durable topic subscribers
179 if (!info.isDurableTopic() && !hasConsumerFor(destination)) {
180 destinations.remove(destination.getPhysicalName());
181 }
182 }
183 subscriptions.remove(info.getConsumerId());
184 }
185 }
186
187 /**
188 * Delete a durable subscriber
189 *
190 * @param clientId
191 * @param subscriberName
192 * @throws JMSException if the subscriber doesn't exist or is still active
193 */
194 public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
195 }
196
197 /**
198 * @param client
199 * @param message
200 * @throws JMSException
201 */
202 public void sendMessage(final BrokerClient client, final ActiveMQMessage message) throws JMSException {
203 if (TransactionManager.getContexTransaction()==AutoCommitTransaction.AUTO_COMMIT_TRANSACTION){
204 doSendMessage(client, message);
205 }else {
206 // If there is no transaction.. then this executes directly.
207 TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask() {
208 public void execute() throws Throwable {
209 doSendMessage(client, message);
210 }
211 });
212 }
213 }
214
215 /**
216 * @param client
217 * @param message
218 * @throws JMSException
219 */
220 private void doSendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
221 Set set = destinationMap.get(message.getJMSActiveMQDestination());
222 if (!set.isEmpty()){
223 for (Iterator i = set.iterator(); i.hasNext();) {
224 TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) i.next();
225 container.targetAndDispatch(client,message);
226 }
227 }
228 }
229
230 /**
231 * @param client
232 * @param ack
233 * @throws JMSException
234 *
235 */
236 public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
237 }
238
239 /**
240 * @throws JMSException
241 *
242 */
243
244 public void poll() throws JMSException {
245 }
246
247 /**
248 * For Transient topics - a MessageContainer maps on to the messages
249 * to be dispatched through a BrokerClient, not a destination
250 * @param physicalName
251 * @return the MessageContainer used for dispatching - always returns null
252 * @throws JMSException
253 */
254 public MessageContainer getContainer(String physicalName) throws JMSException {
255 return null;
256 }
257
258 /**
259 * @return a map of all the destinations
260 */
261 public Map getDestinations() {
262 return Collections.unmodifiableMap(destinations);
263 }
264
265 /**
266 * Returns an unmodifiable map, indexed by String name, of all the {@link javax.jms.Destination}
267 * objects used by non-broker consumers directly connected to this container
268 *
269 * @return
270 */
271 public Map getLocalDestinations() {
272 Map localDestinations = new HashMap();
273 for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) {
274 TransientTopicSubscription sub = (TransientTopicSubscription) iter.next();
275 if (sub.isLocalSubscription() && !sub.isDurableTopic()) {
276 final ActiveMQDestination dest = sub.getDestination();
277 localDestinations.put(dest.getPhysicalName(), dest);
278 }
279 }
280 return Collections.unmodifiableMap(localDestinations);
281 }
282
283 /**
284 * @return the DeadLetterPolicy for this Container Manager
285 */
286 public DeadLetterPolicy getDeadLetterPolicy(){
287 return deadLetterPolicy;
288 }
289
290 /**
291 * Set the DeadLetterPolicy for this Container Manager
292 * @param policy
293 */
294 public void setDeadLetterPolicy(DeadLetterPolicy policy){
295 this.deadLetterPolicy = policy;
296 }
297
298 /**
299 * @return Returns the decoupledDispatch.
300 */
301 public boolean isDecoupledDispatch() {
302 return decoupledDispatch;
303 }
304 /**
305 * @param decoupledDispatch The decoupledDispatch to set.
306 */
307 public void setDecoupledDispatch(boolean decoupledDispatch) {
308 this.decoupledDispatch = decoupledDispatch;
309 }
310 /**
311 * Create filter for a Consumer
312 *
313 * @param info
314 * @return the Fitler
315 * @throws javax.jms.JMSException
316 */
317 protected Filter createFilter(ConsumerInfo info) throws JMSException {
318 Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector());
319 if (info.isNoLocal()) {
320 filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
321 }
322 return filter;
323 }
324
325 protected boolean hasConsumerFor(ActiveMQDestination destination) {
326 for (Iterator i = containers.values().iterator(); i.hasNext();) {
327 TransientTopicBoundedMessageContainer container = (TransientTopicBoundedMessageContainer) i.next();
328 if (container.hasConsumerFor(destination)) {
329 return true;
330 }
331 }
332 return false;
333 }
334
335 /**
336 * @see org.activemq.service.MessageContainerManager#createMessageContainer(org.activemq.message.ActiveMQDestination)
337 */
338 public void createMessageContainer(ActiveMQDestination dest) throws JMSException {
339 }
340
341 /**
342 * @see org.activemq.service.MessageContainerManager#destroyMessageContainer(org.activemq.message.ActiveMQDestination)
343 */
344 public void destroyMessageContainer(ActiveMQDestination dest) throws JMSException {
345 containers.remove(dest);
346 destinationMap.removeAll(dest);
347 }
348
349 /**
350 * @see org.activemq.service.MessageContainerManager#getMessageContainerAdmins()
351 */
352 public Map getMessageContainerAdmins() throws JMSException {
353 return Collections.EMPTY_MAP;
354 }
355
356 }