001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.service.impl;
020 import java.util.ArrayList;
021 import java.util.List;
022 import javax.jms.JMSException;
023 import org.apache.commons.logging.Log;
024 import org.apache.commons.logging.LogFactory;
025 import org.activemq.broker.BrokerClient;
026 import org.activemq.broker.BrokerConnector;
027 import org.activemq.broker.BrokerContainer;
028 import org.activemq.broker.Broker;
029 import org.activemq.filter.Filter;
030 import org.activemq.message.ActiveMQDestination;
031 import org.activemq.message.ActiveMQMessage;
032 import org.activemq.message.BrokerInfo;
033 import org.activemq.message.ConsumerInfo;
034 import org.activemq.message.MessageAck;
035 import org.activemq.service.DeadLetterPolicy;
036 import org.activemq.service.Dispatcher;
037 import org.activemq.service.MessageContainer;
038 import org.activemq.service.MessageIdentity;
039 import org.activemq.service.QueueList;
040 import org.activemq.service.QueueListEntry;
041 import org.activemq.service.RedeliveryPolicy;
042 import org.activemq.service.SubscriberEntry;
043 import org.activemq.service.Subscription;
044 import org.activemq.service.TransactionManager;
045 import org.activemq.service.TransactionTask;
046 import org.activemq.security.SecurityAdapter;
047 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
048 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
049
050 /**
051 * A Subscription holds messages to be dispatched to a a Client Consumer
052 *
053 * @version $Revision: 1.1.1.1 $
054 */
055 public class SubscriptionImpl implements Subscription {
056 private static final Log log = LogFactory.getLog(SubscriptionImpl.class);
057 private String clientId;
058 private String subscriberName;
059 private ActiveMQDestination destination;
060 private String selector;
061 private int prefetchLimit;
062 private boolean noLocal;
063 private int consumerNumber;
064 private String consumerId;
065 private boolean browser;
066 protected Dispatcher dispatch;
067 protected String brokerName;
068 protected String clusterName;
069 protected MessageIdentity lastMessageIdentity;
070 private Filter filter;
071 protected SynchronizedInt unconsumedMessagesDispatched = new SynchronizedInt(0);
072 protected QueueList messagePtrs = new DefaultQueueList();
073 private boolean usePrefetch = true;
074 private SubscriberEntry subscriberEntry;
075 private BrokerClient activeClient;
076 private RedeliveryPolicy redeliveryPolicy;
077 private DeadLetterPolicy deadLetterPolicy;
078 private SynchronizedBoolean active = new SynchronizedBoolean(false);
079 private Object lock = new Object();
080
081 /**
082 * Create a Subscription object that holds messages to be dispatched to a Consumer
083 *
084 * @param dispatcher
085 * @param client
086 * @param info
087 * @param filter
088 * @param redeliveryPolicy
089 * @param deadLetterPolicy
090 */
091 public SubscriptionImpl(Dispatcher dispatcher, BrokerClient client, ConsumerInfo info, Filter filter,
092 RedeliveryPolicy redeliveryPolicy, DeadLetterPolicy deadLetterPolicy) {
093 this.dispatch = dispatcher;
094 this.filter = filter;
095 this.redeliveryPolicy = redeliveryPolicy;
096 this.deadLetterPolicy = deadLetterPolicy;
097 setActiveConsumer(client, info);
098 }
099
100 /**
101 * Set the active consumer info
102 *
103 * @param client
104 * @param info
105 */
106 public void setActiveConsumer(BrokerClient client, ConsumerInfo info) {
107 if (info != null) {
108 this.clientId = info.getClientId();
109 this.subscriberName = info.getConsumerName();
110 this.noLocal = info.isNoLocal();
111 this.destination = info.getDestination();
112 this.selector = info.getSelector();
113 this.prefetchLimit = info.getPrefetchNumber();
114 this.consumerNumber = info.getConsumerNo();
115 this.consumerId = info.getConsumerId();
116 this.browser = info.isBrowser();
117 }
118 this.activeClient = client;
119 if (client != null) {
120 BrokerConnector brokerConnector = client.getBrokerConnector();
121 if (brokerConnector != null) {
122 BrokerInfo brokerInfo = brokerConnector.getBrokerInfo();
123 if (brokerInfo != null) {
124 brokerName = brokerInfo.getBrokerName();
125 clusterName = brokerInfo.getClusterName();
126 }
127 }
128 }
129 }
130
131 /**
132 * @return pretty print of the Subscription
133 */
134 public String toString() {
135 String str = "SubscriptionImpl(" + super.hashCode() + ")[" + consumerId + "]" + clientId + ": "
136 + subscriberName + " : " + destination;
137 return str;
138 }
139
140 /**
141 * Called when the Subscription is discarded
142 *
143 * @throws JMSException
144 */
145 public void clear() throws JMSException {
146 synchronized (lock) {
147 QueueListEntry entry = messagePtrs.getFirstEntry();
148 while (entry != null) {
149 MessagePointer pointer = (MessagePointer) entry.getElement();
150 pointer.clear();
151 entry = messagePtrs.getNextEntry(entry);
152 }
153 messagePtrs.clear();
154 }
155 }
156
157 /**
158 * Called when an active subscriber has closed. This resets all MessagePtrs
159 *
160 * @throws JMSException
161 */
162 public void reset() throws JMSException {
163 synchronized (lock) {
164 QueueListEntry entry = messagePtrs.getFirstEntry();
165 while (entry != null) {
166 MessagePointer pointer = (MessagePointer) entry.getElement();
167 if (pointer.isDispatched() && !pointer.isDeleted()) {
168 pointer.reset();
169 pointer.setRedelivered(true);
170 }
171 else {
172 break;
173 }
174 entry = messagePtrs.getNextEntry(entry);
175 }
176 }
177 }
178
179 public BrokerClient getActiveClient() {
180 return activeClient;
181 }
182
183 /**
184 * @return Returns the clientId.
185 */
186 public String getClientId() {
187 return clientId;
188 }
189
190 /**
191 * @param clientId The clientId to set.
192 */
193 public void setClientId(String clientId) {
194 this.clientId = clientId;
195 }
196
197 /**
198 * @return Returns the filter.
199 */
200 public Filter getFilter() {
201 return filter;
202 }
203
204 /**
205 * @param filter The filter to set.
206 */
207 public void setFilter(Filter filter) {
208 this.filter = filter;
209 }
210
211 public boolean isWildcard() {
212 return filter.isWildcard();
213 }
214
215 public String getPersistentKey() {
216 // not required other than for persistent topic subscriptions
217 return null;
218 }
219
220 public boolean isSameDurableSubscription(ConsumerInfo info) throws JMSException {
221 if (isDurableTopic()) {
222 return equal(clientId, info.getClientId()) && equal(subscriberName, info.getConsumerName());
223 }
224 return false;
225 }
226
227 /**
228 * @return Returns the noLocal.
229 */
230 public boolean isNoLocal() {
231 return noLocal;
232 }
233
234 /**
235 * @param noLocal The noLocal to set.
236 */
237 public void setNoLocal(boolean noLocal) {
238 this.noLocal = noLocal;
239 }
240
241 /**
242 * @return Returns the subscriberName.
243 */
244 public String getSubscriberName() {
245 return subscriberName;
246 }
247
248 /**
249 * @param subscriberName The subscriberName to set.
250 */
251 public void setSubscriberName(String subscriberName) {
252 this.subscriberName = subscriberName;
253 }
254
255 public RedeliveryPolicy getRedeliveryPolicy() {
256 return redeliveryPolicy;
257 }
258
259 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
260 this.redeliveryPolicy = redeliveryPolicy;
261 }
262
263 /**
264 * determines if the Subscription is interested in the message
265 *
266 * @param message
267 * @return true if this Subscription will accept the message
268 * @throws JMSException
269 */
270 public boolean isTarget(ActiveMQMessage message) throws JMSException {
271 boolean result = false;
272 if (message != null) {
273 if (activeClient == null || brokerName == null || clusterName == null
274 || !activeClient.isClusteredConnection() || !message.isEntryCluster(clusterName)
275 || message.isEntryBroker(brokerName)) {
276 result = message.isDispatchedFromDLQ() || filter.matches(message);
277 // lets check that we don't have no-local enabled
278 if (noLocal && result) {
279 if (clientIDsEqual(message)) {
280 result = false;
281 }
282 }
283
284 if (result && !isAuthorizedForMessage(message)) {
285 result = false;
286 }
287 }
288 }
289 return result;
290 }
291
292 /**
293 * If the Subscription is a target for the message, the subscription will add a reference to the message and
294 * register an interest in the message to the container
295 *
296 * @param container
297 * @param message
298 * @throws JMSException
299 */
300 public void addMessage(MessageContainer container, ActiveMQMessage message) throws JMSException {
301 //log.info("###### Adding to subscription: " + this + " message: " + message);
302 if (log.isDebugEnabled()) {
303 log.debug("Adding to subscription: " + this + " message: " + message);
304 }
305 MessagePointer pointer = new MessagePointer(container, message);
306 synchronized (lock) {
307 messagePtrs.add(pointer);
308 }
309 dispatch.wakeup(this);
310 lastMessageIdentity = message.getJMSMessageIdentity();
311 }
312
313 /**
314 * Indicates a message has been delivered to a MessageConsumer
315 *
316 * @param ack
317 * @throws JMSException
318 */
319 public void messageConsumed(final MessageAck ack) throws JMSException {
320 //remove up to this message
321 int count = 0;
322 boolean found = false;
323 synchronized (lock) {
324 QueueListEntry entry = messagePtrs.getFirstEntry();
325 while (entry != null) {
326 final MessagePointer pointer = (MessagePointer) entry.getElement();
327 count++;
328 // If in transaction: only consume the message acked.
329 // If not in transaction: consume all previously delivered messages.
330 if (!ack.isPartOfTransaction() || pointer.getMessageIdentity().equals(ack.getMessageIdentity())) {
331 if ((ack.isExpired() || ack.isMessageRead()) && !browser) {
332 pointer.delete(ack);//delete message from the container (if possible)
333 }
334 if (!ack.isMessageRead() && !browser) {
335 // It was a NACK.
336 pointer.reset();
337 pointer.setRedelivered(true);
338 }
339 else {
340 unconsumedMessagesDispatched.decrement();
341 // We may have to undo the delivery..
342 TransactionManager.getContexTransaction().addPostRollbackTask(new TransactionTask() {
343 public void execute() throws Throwable {
344 unconsumedMessagesDispatched.increment();
345 pointer.reset();
346 pointer.setRedelivered(true);
347 dispatch.wakeup(SubscriptionImpl.this);
348 }
349 });
350 final QueueListEntry theEntry = entry;
351 TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask() {
352 public void execute() throws Throwable {
353 messagePtrs.remove(theEntry);
354 if ((ack.isExpired() || ack.isMessageRead()) && !browser) {
355 if (ack.isExpired() && !pointer.getContainer().isDeadLetterQueue()) {
356 ActiveMQMessage msg = pointer.getContainer().getMessage(
357 pointer.getMessageIdentity());
358 if (msg != null) {
359 deadLetterPolicy.sendToDeadLetter(msg);
360 }
361 }
362 }
363 }
364 });
365 }
366 if (pointer.getMessageIdentity().equals(ack.getMessageIdentity())) {
367 found = true;
368 break;
369 }
370 }
371 entry = messagePtrs.getNextEntry(entry);
372 }
373 }
374 if (!found && log.isDebugEnabled()) {
375 log.debug("Did not find a matching message for identity: " + ack.getMessageIdentity());
376 }
377 dispatch.wakeup(this);
378 }
379
380 /**
381 * Retrieve messages to dispatch
382 *
383 * @return the messages to dispatch
384 * @throws JMSException
385 */
386 public ActiveMQMessage[] getMessagesToDispatch() throws JMSException {
387 if (usePrefetch) {
388 return getMessagesWithPrefetch();
389 }
390 List tmpList = new ArrayList();
391 synchronized (lock) {
392 QueueListEntry entry = messagePtrs.getFirstEntry();
393 while (entry != null) {
394 MessagePointer pointer = (MessagePointer) entry.getElement();
395 if (!pointer.isDispatched()) {
396 ActiveMQMessage msg = pointer.getContainer().getMessage(pointer.getMessageIdentity());
397 if (msg != null) {
398 if (pointer.isDispatched() || pointer.isRedelivered()) {
399 //already dispatched - so mark as redelivered
400 msg.setJMSRedelivered(true);
401 if (redeliveryPolicy.isBackOffMode()
402 && msg.getDeliveryCount() < redeliveryPolicy.getMaximumRetryCount()) {
403 long sleepTime = redeliveryPolicy.getInitialRedeliveryTimeout();
404 sleepTime *= (msg.getDeliveryCount() * redeliveryPolicy.getBackOffIncreaseRate());
405 try {
406 Thread.sleep(sleepTime);
407 }
408 catch (InterruptedException e) {
409 }
410 }
411 //incremenent delivery count
412 msg.incrementDeliveryCount();
413 }
414 if (!pointer.getContainer().isDeadLetterQueue()
415 && (msg.isExpired() || msg.getDeliveryCount() >= redeliveryPolicy
416 .getMaximumRetryCount())) {
417 if (msg.isExpired()) {
418 log.warn("Message: " + msg + " has expired");
419 }
420 else {
421 log.warn("Message: " + msg + " exceeded retry count: " + msg.getDeliveryCount());
422 }
423 deadLetterPolicy.sendToDeadLetter(msg);
424 QueueListEntry discarded = entry;
425 entry = messagePtrs.getPrevEntry(discarded);
426 messagePtrs.remove(discarded);
427 }
428 else {
429 pointer.setDispatched(true);
430 msg.setDispatchedFromDLQ(pointer.getContainer().isDeadLetterQueue());
431 tmpList.add(msg);
432 }
433 }
434 else {
435 //the message is probably expired
436 log.info("Message probably expired: " + msg);
437 QueueListEntry discarded = entry;
438 entry = messagePtrs.getPrevEntry(discarded);
439 messagePtrs.remove(discarded);
440 if (msg != null) {
441 deadLetterPolicy.sendToDeadLetter(msg);
442 }
443 }
444 }
445 entry = messagePtrs.getNextEntry(entry);
446 }
447 }
448 ActiveMQMessage[] messages = new ActiveMQMessage[tmpList.size()];
449 return (ActiveMQMessage[]) tmpList.toArray(messages);
450 }
451
452 public SubscriberEntry getSubscriptionEntry() {
453 if (subscriberEntry == null) {
454 subscriberEntry = createSubscriptionEntry();
455 }
456 return subscriberEntry;
457 }
458
459 public boolean isLocalSubscription() {
460 if (activeClient != null) {
461 return !(activeClient.isClusteredConnection() || activeClient.isBrokerConnection());
462 }
463 return true;
464 }
465
466 // Implementation methods
467 //-------------------------------------------------------------------------
468 protected SubscriberEntry createSubscriptionEntry() {
469 SubscriberEntry answer = new SubscriberEntry();
470 answer.setClientID(clientId);
471 answer.setConsumerName(subscriberName);
472 answer.setDestination(destination.getPhysicalName());
473 answer.setSelector(selector);
474 return answer;
475 }
476
477 protected ActiveMQMessage[] getMessagesWithPrefetch() throws JMSException {
478
479 List tmpList = new ArrayList();
480 synchronized (lock) {
481 QueueListEntry entry = messagePtrs.getFirstEntry();
482 int count = 0;
483 boolean fragmentedMessages = false;
484 int maxNumberToDispatch = prefetchLimit - unconsumedMessagesDispatched.get();
485 while (entry != null && (count < maxNumberToDispatch || fragmentedMessages)) {
486 MessagePointer pointer = (MessagePointer) entry.getElement();
487 if (!pointer.isDispatched()) {
488 ActiveMQMessage msg = pointer.getContainer().getMessage(pointer.getMessageIdentity());
489 if (msg != null && !msg.isExpired()) {
490 if (pointer.isDispatched() || pointer.isRedelivered()) {
491 //already dispatched - so mark as redelivered
492 msg.setJMSRedelivered(true);
493 }
494 pointer.setDispatched(true);
495 tmpList.add(msg);
496 fragmentedMessages = msg.isMessagePart() && !msg.isLastMessagePart();
497 unconsumedMessagesDispatched.increment();
498 count++;
499 }
500 else {
501 //the message is probably expired
502 log.info("Message probably expired: " + msg);
503 QueueListEntry discarded = entry;
504 entry = messagePtrs.getPrevEntry(discarded);
505 messagePtrs.remove(discarded);
506 if (msg != null) {
507 deadLetterPolicy.sendToDeadLetter(msg);
508 }
509 }
510 }
511 entry = messagePtrs.getNextEntry(entry);
512 }
513 }
514 /**
515 * if (tmpList.isEmpty() && ! messagePtrs.isEmpty()) { System.out.println("### Nothing to dispatch but
516 * messagePtrs still has: " + messagePtrs.size() + " to dispatch, prefetchLimit: " + prefetchLimit + "
517 * unconsumedMessagesDispatched: " + unconsumedMessagesDispatched.get() + " maxNumberToDispatch: " +
518 * maxNumberToDispatch); MessagePointer first = (MessagePointer) messagePtrs.getFirst(); System.out.println("###
519 * First: " + first + " dispatched: " + first.isDispatched() + " id: " + first.getMessageIdentity()); } else {
520 * if (! tmpList.isEmpty()) { System.out.println("### dispatching: " + tmpList.size() + " items = " + tmpList); } }
521 */
522 ActiveMQMessage[] messages = new ActiveMQMessage[tmpList.size()];
523 return (ActiveMQMessage[]) tmpList.toArray(messages);
524 }
525
526 /**
527 * Indicates the Subscription it's reached it's pre-fetch limit
528 *
529 * @return true/false
530 * @throws JMSException
531 */
532 public boolean isAtPrefetchLimit() throws JMSException {
533 if (usePrefetch) {
534 int underlivedMessageCount = messagePtrs.size() - unconsumedMessagesDispatched.get();
535 return underlivedMessageCount >= prefetchLimit;
536 }
537 else {
538 return false;
539 }
540 }
541
542 /**
543 * Indicates if this Subscription has more messages to send to the Consumer
544 *
545 * @return true if more messages available to dispatch
546 */
547 public boolean isReadyToDispatch() throws JMSException {
548 /** TODO we may have dispatched messags inside messagePtrs */
549 boolean answer = active.get() && messagePtrs.size() > 0;
550 return answer;
551 }
552
553 /**
554 * @return Returns the destination.
555 */
556 public ActiveMQDestination getDestination() {
557 return destination;
558 }
559
560 /**
561 * @return Returns the selector.
562 */
563 public String getSelector() {
564 return selector;
565 }
566
567 /**
568 * @return Returns the active.
569 */
570 public boolean isActive() {
571 return active.get();
572 }
573
574 /**
575 * @param newActive The active to set.
576 * @throws JMSException
577 */
578 public void setActive(boolean newActive) throws JMSException {
579 synchronized (active.getLock()) {
580 active.set(newActive);
581 }
582 if (!newActive) {
583 reset();
584 }
585 }
586
587 /**
588 * @return Returns the consumerNumber.
589 */
590 public int getConsumerNumber() {
591 return consumerNumber;
592 }
593
594 /**
595 * @return the consumer Id for the active consumer
596 */
597 public String getConsumerId() {
598 return consumerId;
599 }
600
601 /**
602 * Indicates the Subscriber is a Durable Subscriber
603 *
604 * @return true if the subscriber is a durable topic
605 * @throws JMSException
606 */
607 public boolean isDurableTopic() throws JMSException {
608 return destination.isTopic() && subscriberName != null && subscriberName.length() > 0;
609 }
610
611 /**
612 * Indicates the consumer is a browser only
613 *
614 * @return true if a Browser
615 * @throws JMSException
616 */
617 public boolean isBrowser() throws JMSException {
618 return browser;
619 }
620
621 public MessageIdentity getLastMessageIdentity() throws JMSException {
622 return lastMessageIdentity;
623 }
624
625 public void setLastMessageIdentifier(MessageIdentity messageIdentity) throws JMSException {
626 this.lastMessageIdentity = messageIdentity;
627 }
628
629 protected boolean clientIDsEqual(ActiveMQMessage message) {
630 String msgClientID = message.getJMSClientID();
631 String subClientID = clientId;
632 if (msgClientID == null || subClientID == null) {
633 return false;
634 }
635 else {
636 return msgClientID.equals(subClientID);
637 }
638 }
639
640 protected static final boolean equal(Object left, Object right) {
641 return left == right || (left != null && right != null && left.equals(right));
642 }
643
644
645 /**
646 * Returns whether or not the consumer can receive the given message
647 */
648 protected boolean isAuthorizedForMessage(ActiveMQMessage message) {
649 // TODO we could maybe provide direct access to the security adapter
650 BrokerClient client = getActiveClient();
651 if (client != null) {
652 BrokerConnector connector = client.getBrokerConnector();
653 if (connector != null) {
654 BrokerContainer container = connector.getBrokerContainer();
655 if (container != null) {
656 Broker broker = container.getBroker();
657 if (broker != null) {
658 SecurityAdapter securityAdapter = broker.getSecurityAdapter();
659 if (securityAdapter != null) {
660 return securityAdapter.authorizeReceive(client, message);
661 }
662 }
663 }
664 }
665 }
666 return true;
667 }
668 }