001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.service.impl;
019
020 import java.util.Collections;
021 import java.util.HashMap;
022 import java.util.Iterator;
023 import java.util.Map;
024 import java.util.Set;
025
026 import javax.jms.DeliveryMode;
027 import javax.jms.Destination;
028 import javax.jms.JMSException;
029
030 import org.apache.commons.logging.Log;
031 import org.apache.commons.logging.LogFactory;
032 import org.activemq.broker.BrokerClient;
033 import org.activemq.filter.AndFilter;
034 import org.activemq.filter.DestinationMap;
035 import org.activemq.filter.Filter;
036 import org.activemq.filter.FilterFactory;
037 import org.activemq.filter.FilterFactoryImpl;
038 import org.activemq.filter.NoLocalFilter;
039 import org.activemq.message.ActiveMQDestination;
040 import org.activemq.message.ActiveMQMessage;
041 import org.activemq.message.ActiveMQQueue;
042 import org.activemq.message.ConsumerInfo;
043 import org.activemq.message.MessageAck;
044 import org.activemq.service.DeadLetterPolicy;
045 import org.activemq.service.Dispatcher;
046 import org.activemq.service.MessageContainer;
047 import org.activemq.service.QueueList;
048 import org.activemq.service.QueueListEntry;
049 import org.activemq.service.QueueMessageContainer;
050 import org.activemq.service.QueueMessageContainerManager;
051 import org.activemq.service.RedeliveryPolicy;
052 import org.activemq.service.Subscription;
053 import org.activemq.service.SubscriptionContainer;
054 import org.activemq.service.TransactionManager;
055 import org.activemq.service.TransactionTask;
056 import org.activemq.store.PersistenceAdapter;
057
058 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
059
060 /**
061 * A default Broker used for Queue messages
062 *
063 * @version $Revision: 1.1.1.1 $
064 */
065 public class DurableQueueMessageContainerManager extends MessageContainerManagerSupport implements QueueMessageContainerManager {
066 private static final Log log = LogFactory.getLog(DurableQueueMessageContainerManager.class);
067 private static final int MAX_MESSAGES_DISPATCHED_FROM_POLL = 50;
068
069 private PersistenceAdapter persistenceAdapter;
070 protected SubscriptionContainer subscriptionContainer;
071 protected FilterFactory filterFactory;
072 protected Map activeSubscriptions = new ConcurrentHashMap();
073 protected Map browsers = new ConcurrentHashMap();
074 protected Map messagePartSubscribers = new ConcurrentHashMap();
075 protected DestinationMap destinationMap = new DestinationMap();
076 private Object subscriptionMutex = new Object();
077
078
079
080
081 public DurableQueueMessageContainerManager(PersistenceAdapter persistenceAdapter, RedeliveryPolicy redeliveryPolicy,DeadLetterPolicy deadLetterPolicy) {
082 this(persistenceAdapter, new SubscriptionContainerImpl(redeliveryPolicy,deadLetterPolicy), new FilterFactoryImpl(), new DispatcherImpl());
083 }
084
085 public DurableQueueMessageContainerManager(PersistenceAdapter persistenceAdapter, SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) {
086 super(dispatcher);
087 this.persistenceAdapter = persistenceAdapter;
088 this.subscriptionContainer = subscriptionContainer;
089 this.filterFactory = filterFactory;
090 }
091
092 /**
093 * Answers true if this ContainerManager is interested in managing the destination.
094 *
095 * @param destination
096 * @param b
097 * @return
098 */
099 private boolean isManagerFor(ActiveMQDestination destination) {
100 return destination!=null && destination.isQueue() && !destination.isTemporary();
101 }
102
103 /**
104 * Answers true if this ContainerManager is interested in handing a operation of
105 * on the provided destination. persistentOp is true when the opperation is persistent.
106 *
107 * @param destination
108 * @param persistentOp
109 * @param b
110 * @return
111 */
112 private boolean isManagerFor(ActiveMQDestination destination, boolean persistentOp) {
113 // We are going to handle both persistent and non persistent operations for now.
114 return isManagerFor(destination) && persistentOp;
115 }
116
117 /**
118 * Returns an unmodifiable map, indexed by String name, of all the {@link javax.jms.Destination}
119 * objects used by non-broker consumers directly connected to this container
120 *
121 * @return
122 */
123 public Map getLocalDestinations() {
124 Map localDestinations = new HashMap();
125 for (Iterator iter = subscriptionContainer.subscriptionIterator(); iter.hasNext();) {
126 Subscription sub = (Subscription) iter.next();
127 if (sub.isLocalSubscription()) {
128 final ActiveMQDestination dest = sub.getDestination();
129 localDestinations.put(dest.getPhysicalName(), dest);
130 }
131 }
132 return Collections.unmodifiableMap(localDestinations);
133 }
134
135 /**
136 * @param client
137 * @param info
138 * @throws javax.jms.JMSException
139 */
140 public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
141
142 // Are we not intrested in handling that destination?
143 if( !isManagerFor(info.getDestination()) ) {
144 return;
145 }
146
147 if (log.isDebugEnabled()) {
148 log.debug("Adding consumer: " + info);
149 }
150
151 //ensure a matching container exists for the destination
152 getContainer(info.getDestination().getPhysicalName());
153
154 Subscription sub = subscriptionContainer.makeSubscription(dispatcher, client,info, createFilter(info));
155 dispatcher.addActiveSubscription(client, sub);
156 updateActiveSubscriptions(sub);
157
158 // set active last in case we end up dispatching some messages
159 // while recovering
160 sub.setActive(true);
161 }
162
163 /**
164 * @param client
165 * @param info
166 * @throws javax.jms.JMSException
167 */
168 public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
169 if (log.isDebugEnabled()) {
170 log.debug("Removing consumer: " + info);
171 }
172 if (info.getDestination() != null && info.getDestination().isQueue()) {
173 synchronized (subscriptionMutex) {
174 Subscription sub = (Subscription) subscriptionContainer.removeSubscription(info.getConsumerId());
175 if (sub != null) {
176 sub.setActive(false);
177 sub.clear();//resets entries in the QueueMessageContainer
178 dispatcher.removeActiveSubscription(client, sub);
179 //need to do wildcards for this - but for now use exact matches
180 for (Iterator iter = messageContainers.values().iterator(); iter.hasNext();) {
181 QueueMessageContainer container = (QueueMessageContainer) iter.next();
182 //should change this for wild cards ...
183 if (container.getDestinationName().equals(sub.getDestination().getPhysicalName())) {
184 QueueList list = getSubscriptionList(container);
185 list.remove(sub);
186 if (list.isEmpty()) {
187 activeSubscriptions.remove(sub.getDestination().getPhysicalName());
188 }
189 list = getBrowserList(container);
190 list.remove(sub);
191 if (list.isEmpty()) {
192 browsers.remove(sub.getDestination().getPhysicalName());
193 }
194 }
195 }
196 }
197 }
198 }
199 }
200
201 /**
202 * Delete a durable subscriber
203 *
204 * @param clientId
205 * @param subscriberName
206 * @throws javax.jms.JMSException if the subscriber doesn't exist or is still active
207 */
208 public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
209 }
210
211 /**
212 * @param client
213 * @param message
214 * @throws javax.jms.JMSException
215 */
216 public void sendMessage(final BrokerClient client, final ActiveMQMessage message) throws JMSException {
217
218 ActiveMQDestination dest = (ActiveMQDestination) message.getJMSDestination();
219 // Are we not intrested in handling that destination?
220 if( !isManagerFor(dest, message.getJMSDeliveryMode()==DeliveryMode.PERSISTENT) ) {
221 return;
222 }
223
224 if (log.isDebugEnabled()) {
225 log.debug("Dispaching message: " + message);
226 }
227 //ensure a matching container exists for the destination
228 getContainer(((ActiveMQDestination) message.getJMSDestination()).getPhysicalName());
229 Set set = destinationMap.get(message.getJMSActiveMQDestination());
230 for (Iterator i = set.iterator();i.hasNext();) {
231 QueueMessageContainer container = (QueueMessageContainer) i.next();
232 container.addMessage(message);
233 // Once transaction has completed.. dispatch the message.
234 TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){
235 public void execute() throws Throwable {
236 dispatcher.wakeup();
237 updateSendStats(client, message);
238 }
239 });
240
241 }
242 }
243
244 /**
245 * Acknowledge a message as being read and consumed by the Consumer
246 *
247 * @param client
248 * @param ack
249 * @throws javax.jms.JMSException
250 */
251 public void acknowledgeMessage(final BrokerClient client, final MessageAck ack) throws JMSException {
252 // Are we not intrested in handling that destination?
253 if( !isManagerFor(ack.getDestination(), ack.isPersistent()) ) {
254 return;
255 }
256 final Subscription sub = subscriptionContainer.getSubscription(ack.getConsumerId());
257 if (sub == null){
258 return;
259 }
260
261 sub.messageConsumed(ack);
262 if (ack.isMessageRead()) {
263 updateAcknowledgeStats(client, sub);
264 }
265 }
266
267 /**
268 * Poll for messages
269 *
270 * @throws javax.jms.JMSException
271 */
272 public void poll() throws JMSException {
273 synchronized (subscriptionMutex) {
274 for (Iterator iter = activeSubscriptions.keySet().iterator(); iter.hasNext();) {
275 QueueMessageContainer container = (QueueMessageContainer) iter.next();
276
277 QueueList browserList = (QueueList) browsers.get(container);
278 doPeek(container, browserList);
279 QueueList list = (QueueList) activeSubscriptions.get(container);
280 doPoll(container, list);
281 }
282 }
283 }
284
285 public MessageContainer getContainer(String destinationName) throws JMSException {
286 MessageContainer container = (MessageContainer) messageContainers.get(destinationName);
287 if (container == null) {
288 synchronized (subscriptionMutex) {
289 container = super.getContainer(destinationName);
290 }
291 }
292 return container;
293 }
294
295 // Implementation methods
296 //-------------------------------------------------------------------------
297
298 protected MessageContainer createContainer(String destinationName) throws JMSException {
299 QueueMessageContainer container = new DurableQueueMessageContainer(persistenceAdapter, persistenceAdapter.createQueueMessageStore(destinationName), destinationName);
300
301 //Add any interested Subscriptions to the new Container
302 for (Iterator iter = subscriptionContainer.subscriptionIterator(); iter.hasNext();) {
303 Subscription sub = (Subscription) iter.next();
304 if (sub.isBrowser()) {
305 updateBrowsers(container, sub);
306 }
307 else {
308 updateActiveSubscriptions(container, sub);
309 }
310 }
311
312 ActiveMQDestination key = new ActiveMQQueue(destinationName);
313 destinationMap.put(key, container);
314 return container;
315 }
316
317 protected Destination createDestination(String destinationName) {
318 return new ActiveMQQueue(destinationName);
319 }
320
321 private void doPeek(QueueMessageContainer container, QueueList browsers) throws JMSException {
322 if (browsers != null && browsers.size() > 0) {
323 for (int i = 0; i < browsers.size(); i++) {
324 SubscriptionImpl sub = (SubscriptionImpl) browsers.get(i);
325 int count = 0;
326 ActiveMQMessage msg = null;
327 do {
328 msg = container.peekNext(sub.getLastMessageIdentity());
329 if (msg != null) {
330 if (sub.isTarget(msg)) {
331 System.out.println("browser dispatch: "+msg.getJMSMessageID());
332 sub.addMessage(container, msg);
333 dispatcher.wakeup(sub);
334 }
335 else {
336 sub.setLastMessageIdentifier(msg.getJMSMessageIdentity());
337 }
338 }
339 }
340 while (msg != null && !sub.isAtPrefetchLimit() && count++ < MAX_MESSAGES_DISPATCHED_FROM_POLL);
341 }
342 }
343 }
344
345 private void doPoll(QueueMessageContainer container, QueueList subList) throws JMSException {
346 int count = 0;
347 ActiveMQMessage msg = null;
348 if (subList != null && subList.size() > 0) {
349 do {
350 boolean dispatched = false;
351 msg = container.poll();
352 if (msg != null) {
353 QueueListEntry entry = subList.getFirstEntry();
354 boolean targeted = false;
355 while (entry != null) {
356 SubscriptionImpl sub = (SubscriptionImpl) entry.getElement();
357 if (sub.isTarget(msg)) {
358 targeted = true;
359 if (msg.isMessagePart()){
360 SubscriptionImpl sameTarget = (SubscriptionImpl)messagePartSubscribers.get(msg.getParentMessageID());
361 if (sameTarget == null){
362 sameTarget = sub;
363 messagePartSubscribers.put(msg.getParentMessageID(), sameTarget);
364 }
365 sameTarget.addMessage(container,msg);
366 if (msg.isLastMessagePart()){
367 messagePartSubscribers.remove(msg.getParentMessageID());
368 }
369 dispatched = true;
370 dispatcher.wakeup(sameTarget);
371 break;
372 }else if (!sub.isAtPrefetchLimit()) {
373 System.out.println("dispatching: "+msg.getJMSMessageID());
374 sub.addMessage(container, msg);
375 dispatched = true;
376 dispatcher.wakeup(sub);
377 subList.rotate(); //round-robin the list
378 break;
379 }
380 }
381 entry = subList.getNextEntry(entry);
382 }
383 if (!dispatched) {
384 if (targeted) { //ie. it can be selected by current active consumers - but they are at
385 // pre-fectch
386 // limit
387 container.returnMessage(msg.getJMSMessageIdentity());
388 }
389 break;
390 }
391 }
392 }
393 while (msg != null && count++ < MAX_MESSAGES_DISPATCHED_FROM_POLL);
394 }
395 }
396
397 private void updateActiveSubscriptions(Subscription subscription) throws JMSException {
398 //need to do wildcards for this - but for now use exact matches
399 synchronized (subscriptionMutex) {
400 boolean processedSubscriptionContainer = false;
401
402 String subscriptionPhysicalName = subscription.getDestination().getPhysicalName();
403 for (Iterator iter = messageContainers.entrySet().iterator(); iter.hasNext();) {
404 Map.Entry entry = (Map.Entry) iter.next();
405 String destinationName = (String) entry.getKey();
406 QueueMessageContainer container = (QueueMessageContainer) entry.getValue();
407
408 if (destinationName.equals(subscriptionPhysicalName)) {
409 processedSubscriptionContainer = true;
410 }
411 processSubscription(subscription, container);
412 }
413 if (!processedSubscriptionContainer) {
414 processSubscription(subscription, (QueueMessageContainer) getContainer(subscriptionPhysicalName));
415 }
416 }
417 }
418
419 protected void processSubscription(Subscription subscription, QueueMessageContainer container) throws JMSException {
420 // TODO should change this for wild cards ...
421 if (subscription.isBrowser()) {
422 updateBrowsers(container, subscription);
423 }
424 else {
425 updateActiveSubscriptions(container, subscription);
426 }
427 }
428
429 private void updateActiveSubscriptions(QueueMessageContainer container, Subscription sub) throws JMSException {
430 //need to do wildcards for this - but for now use exact matches
431 //should change this for wild cards ...
432 if (container.getDestinationName().equals(sub.getDestination().getPhysicalName())) {
433 container.reset();//reset container - flushing all filter out messages to new consumer
434 QueueList list = getSubscriptionList(container);
435 if (!list.contains(sub)) {
436 list.add(sub);
437 }
438 }
439 }
440
441 private QueueList getSubscriptionList(QueueMessageContainer container) {
442 QueueList list = (QueueList) activeSubscriptions.get(container);
443 if (list == null) {
444 list = new DefaultQueueList();
445 activeSubscriptions.put(container, list);
446 }
447 return list;
448 }
449
450 private void updateBrowsers(QueueMessageContainer container, Subscription sub) throws JMSException {
451 //need to do wildcards for this - but for now use exact matches
452 //should change this for wild cards ...
453 if (container.getDestinationName().equals(sub.getDestination().getPhysicalName())) {
454 container.reset();//reset container - flushing all filter out messages to new consumer
455 QueueList list = getBrowserList(container);
456 if (!list.contains(sub)) {
457 list.add(sub);
458 }
459 }
460 }
461
462 private QueueList getBrowserList(QueueMessageContainer container) {
463 QueueList list = (QueueList) browsers.get(container);
464 if (list == null) {
465 list = new DefaultQueueList();
466 browsers.put(container, list);
467 }
468 return list;
469 }
470
471 /**
472 * Create filter for a Consumer
473 *
474 * @param info
475 * @return the Fitler
476 * @throws javax.jms.JMSException
477 */
478 protected Filter createFilter(ConsumerInfo info) throws JMSException {
479 Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector());
480 if (info.isNoLocal()) {
481 filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
482 }
483 return filter;
484 }
485
486 public void createMessageContainer(ActiveMQDestination dest) throws JMSException {
487 // This container only does queues.
488 if(!dest.isQueue())
489 return;
490 super.createMessageContainer(dest);
491 }
492
493 public synchronized void destroyMessageContainer(ActiveMQDestination dest) throws JMSException {
494 // This container only does queues.
495 if(!dest.isQueue())
496 return;
497 super.destroyMessageContainer(dest);
498 destinationMap.removeAll(dest);
499 }
500
501 /**
502 * Add a message to a dead letter queue
503 * @param deadLetterName
504 * @param message
505 * @throws JMSException
506 */
507 public void sendToDeadLetterQueue(String deadLetterName,ActiveMQMessage message) throws JMSException{
508 QueueMessageContainer container = (QueueMessageContainer)getContainer(deadLetterName);
509 container.setDeadLetterQueue(true);
510 container.addMessage(message);
511 dispatcher.wakeup();
512 }
513 }