001 /**
002 *
003 * Copyright 2004 Hiram Chirino
004 * Copyright 2004 Protique Ltd
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 *
018 **/
019 package org.activemq.store.jdbc;
020
021 import java.sql.Connection;
022 import java.sql.SQLException;
023
024 import javax.jms.JMSException;
025
026 import org.activemq.io.WireFormat;
027 import org.activemq.message.ConsumerInfo;
028 import org.activemq.service.MessageIdentity;
029 import org.activemq.service.SubscriberEntry;
030 import org.activemq.store.RecoveryListener;
031 import org.activemq.store.TopicMessageStore;
032 import org.activemq.store.jdbc.JDBCAdapter.MessageListResultHandler;
033 import org.activemq.util.JMSExceptionHelper;
034
035 /**
036 * @version $Revision: 1.1 $
037 */
038 public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
039
040 public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, String destinationName) {
041 super(persistenceAdapter, adapter, wireFormat, destinationName);
042 }
043
044 public void setLastAcknowledgedMessageIdentity(String subscription, MessageIdentity messageIdentity) throws JMSException {
045 long seq = getMessageSequenceId(messageIdentity);
046 // Get a connection and insert the message into the DB.
047 Connection c = null;
048 try {
049 c = persistenceAdapter.getConnection();
050 adapter.doSetLastAck(c, destinationName, subscription, seq);
051 }
052 catch (SQLException e) {
053 throw JMSExceptionHelper.newJMSException("Failed to store ack for: " + subscription + " on message " + messageIdentity + " in container: " + e, e);
054 }
055 finally {
056 persistenceAdapter.returnConnection(c);
057 }
058 }
059
060 /**
061 * @see org.activemq.store.TopicMessageStore#getLastestMessageIdentity()
062 */
063 public MessageIdentity getLastestMessageIdentity() throws JMSException {
064 return new MessageIdentity(null, new Long(sequenceGenerator.getLastSequenceId()));
065 }
066
067 /**
068 *
069 */
070 public void recoverSubscription(String subscriptionId, MessageIdentity lastDispatchedMessage, final RecoveryListener listener) throws JMSException {
071
072 Connection c = null;
073 try {
074 c = persistenceAdapter.getConnection();
075 adapter.doRecoverSubscription(c, destinationName, subscriptionId, new MessageListResultHandler() {
076 public void onMessage(long seq, String messageID) throws JMSException {
077 MessageIdentity messageIdentity = new MessageIdentity(messageID, new Long(seq));
078 listener.recoverMessage(messageIdentity);
079 }
080 });
081 }
082 catch (SQLException e) {
083 throw JMSExceptionHelper.newJMSException("Failed to recover subscription: " + subscriptionId + ". Reason: " + e, e);
084 }
085 finally {
086 persistenceAdapter.returnConnection(c);
087 }
088 }
089
090 /**
091 * @see org.activemq.store.TopicMessageStore#setSubscriberEntry(org.activemq.message.ConsumerInfo, org.activemq.service.SubscriberEntry)
092 */
093 public void setSubscriberEntry(ConsumerInfo info, SubscriberEntry subscriberEntry) throws JMSException {
094 String key = info.getConsumerKey();
095 Connection c = null;
096 try {
097 c = persistenceAdapter.getConnection();
098 adapter.doSetSubscriberEntry(c, destinationName, key, subscriberEntry);
099 }
100 catch (SQLException e) {
101 throw JMSExceptionHelper.newJMSException("Failed to lookup subscription for info: " + info + ". Reason: " + e, e);
102 }
103 finally {
104 persistenceAdapter.returnConnection(c);
105 }
106 }
107
108 /**
109 * @see org.activemq.store.TopicMessageStore#getSubscriberEntry(org.activemq.message.ConsumerInfo)
110 */
111 public SubscriberEntry getSubscriberEntry(ConsumerInfo info) throws JMSException {
112 String key = info.getConsumerKey();
113 Connection c = null;
114 try {
115 c = persistenceAdapter.getConnection();
116 return adapter.doGetSubscriberEntry(c, destinationName, key);
117 }
118 catch (SQLException e) {
119 throw JMSExceptionHelper.newJMSException("Failed to lookup subscription for info: " + info + ". Reason: " + e, e);
120 }
121 finally {
122 persistenceAdapter.returnConnection(c);
123 }
124 }
125
126 public void deleteSubscription(String subscription) throws JMSException {
127 Connection c = null;
128 try {
129 c = persistenceAdapter.getConnection();
130 adapter.doDeleteSubscription(c, destinationName, subscription);
131 }
132 catch (SQLException e) {
133 throw JMSExceptionHelper.newJMSException("Failed to remove subscription for: " + subscription + ". Reason: " + e, e);
134 }
135 finally {
136 persistenceAdapter.returnConnection(c);
137 }
138 }
139
140 public void incrementMessageCount(MessageIdentity messageId) throws JMSException {
141 }
142
143 public void decrementMessageCountAndMaybeDelete(MessageIdentity messageIdentity) throws JMSException {
144 }
145
146 }