001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.store.vm;
019
020 import java.util.Collections;
021 import java.util.HashMap;
022 import java.util.Iterator;
023 import java.util.LinkedHashMap;
024 import java.util.Map;
025
026 import javax.jms.JMSException;
027
028 import org.activemq.message.ActiveMQMessage;
029 import org.activemq.message.ConsumerInfo;
030 import org.activemq.service.MessageIdentity;
031 import org.activemq.service.SubscriberEntry;
032 import org.activemq.store.RecoveryListener;
033 import org.activemq.store.TopicMessageStore;
034
035 /**
036 * @version $Revision: 1.1.1.1 $
037 */
038 public class VMTopicMessageStore extends VMMessageStore implements TopicMessageStore {
039 private static final Integer ONE = new Integer(1);
040
041 private Map ackDatabase;
042 private Map messageCounts;
043 private Map subscriberDatabase;
044
045 public VMTopicMessageStore() {
046 this(new LinkedHashMap(), makeMap(), makeMap(), makeMap());
047 }
048
049 public VMTopicMessageStore(LinkedHashMap messageTable, Map subscriberDatabase, Map ackDatabase, Map messageCounts) {
050 super(messageTable);
051 this.subscriberDatabase = subscriberDatabase;
052 this.ackDatabase = ackDatabase;
053 this.messageCounts = messageCounts;
054 }
055
056 public synchronized void incrementMessageCount(MessageIdentity messageId) throws JMSException {
057 Integer number = (Integer) messageCounts.get(messageId.getMessageID());
058 if (number == null) {
059 number = ONE;
060 }
061 else {
062 number = new Integer(number.intValue() + 1);
063 }
064 messageCounts.put(messageId.getMessageID(), number);
065 }
066
067 public synchronized void decrementMessageCountAndMaybeDelete(MessageIdentity msgId) throws JMSException {
068 Integer number = (Integer) messageCounts.get(msgId.getMessageID());
069 if (number == null || number.intValue() <= 1) {
070 removeMessage(msgId);
071 if (number != null) {
072 messageCounts.remove(msgId.getMessageID());
073 }
074 }
075 else {
076 messageCounts.put(msgId.getMessageID(), new Integer(number.intValue() - 1));
077 number = ONE;
078 }
079 }
080
081 public void setLastAcknowledgedMessageIdentity(String subscription, MessageIdentity messageIdentity) throws JMSException {
082 ackDatabase.put(subscription, messageIdentity);
083 }
084
085 public synchronized void recoverSubscription(String subscriptionId, MessageIdentity lastDispatchedMessage, RecoveryListener listener) throws JMSException {
086 //iterate through the message table and populate the subscriber
087 Map map = new HashMap(messageTable);
088 boolean alreadyAcked = true;
089 MessageIdentity lastAcked = (MessageIdentity)ackDatabase.get(subscriptionId);
090 if( lastAcked==null )
091 return;
092
093 for (Iterator i = map.values().iterator(); i.hasNext(); ){
094 ActiveMQMessage msg = (ActiveMQMessage)i.next();
095 if (!alreadyAcked){
096 listener.recoverMessage(msg.getJMSMessageIdentity());
097 }
098 if (lastAcked.getMessageID().equals(msg.getJMSMessageID())){
099 alreadyAcked = false;
100 }
101 }
102 }
103
104 public MessageIdentity getLastestMessageIdentity() throws JMSException {
105 return super.lastMessageIdentity;
106 }
107
108 public SubscriberEntry getSubscriberEntry(ConsumerInfo info) throws JMSException {
109 Object key = info.getConsumerKey();
110 return (SubscriberEntry) subscriberDatabase.get(key);
111 }
112
113 public void setSubscriberEntry(ConsumerInfo info, SubscriberEntry subscriberEntry) throws JMSException {
114 subscriberDatabase.put(info.getConsumerKey(), subscriberEntry);
115 }
116
117 public void stop() throws JMSException {
118 }
119
120 protected static Map makeMap() {
121 return Collections.synchronizedMap(new HashMap());
122 }
123
124 public void deleteSubscription(String sub) {
125 ackDatabase.remove(sub);
126 }
127
128 }