001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.store.vm;
019
020 import java.util.ArrayList;
021 import java.util.Iterator;
022
023 import javax.jms.JMSException;
024 import javax.transaction.xa.XAException;
025
026 import org.activemq.message.ActiveMQMessage;
027 import org.activemq.message.ActiveMQXid;
028 import org.activemq.message.MessageAck;
029 import org.activemq.store.MessageStore;
030 import org.activemq.store.ProxyMessageStore;
031 import org.activemq.store.ProxyTopicMessageStore;
032 import org.activemq.store.TopicMessageStore;
033 import org.activemq.store.TransactionStore;
034
035 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
036
037 /**
038 * Provides a TransactionStore implementation that can create transaction aware
039 * MessageStore objects from non transaction aware MessageStore objects.
040 *
041 * @version $Revision: 1.1.1.1 $
042 */
043 public class VMTransactionStore implements TransactionStore {
044
045 ConcurrentHashMap inflightTransactions = new ConcurrentHashMap();
046
047 ConcurrentHashMap preparedTransactions = new ConcurrentHashMap();
048
049 private boolean doingRecover;
050
051 public static class Tx {
052 private ArrayList messages = new ArrayList();
053
054 private ArrayList acks = new ArrayList();
055
056 public void add(AddMessageCommand msg) {
057 messages.add(msg);
058 }
059
060 public void add(RemoveMessageCommand ack) {
061 acks.add(ack);
062 }
063
064 public ActiveMQMessage[] getMessages() {
065 ActiveMQMessage rc[] = new ActiveMQMessage[messages.size()];
066 int count=0;
067 for (Iterator iter = messages.iterator(); iter.hasNext();) {
068 AddMessageCommand cmd = (AddMessageCommand) iter.next();
069 rc[count++] = cmd.getMessage();
070 }
071 return rc;
072 }
073
074 public MessageAck[] getAcks() {
075 MessageAck rc[] = new MessageAck[acks.size()];
076 int count=0;
077 for (Iterator iter = acks.iterator(); iter.hasNext();) {
078 RemoveMessageCommand cmd = (RemoveMessageCommand) iter.next();
079 rc[count++] = cmd.getMessageAck();
080 }
081 return rc;
082 }
083
084 /**
085 * @throws JMSException
086 */
087 public void commit() throws XAException {
088 try {
089 // Do all the message adds.
090 for (Iterator iter = messages.iterator(); iter.hasNext();) {
091 AddMessageCommand cmd = (AddMessageCommand) iter.next();
092 cmd.run();
093 }
094 // And removes..
095 for (Iterator iter = acks.iterator(); iter.hasNext();) {
096 RemoveMessageCommand cmd = (RemoveMessageCommand) iter.next();
097 cmd.run();
098 }
099 } catch ( JMSException e) {
100 throw (XAException)new XAException(XAException.XAER_RMFAIL).initCause(e);
101 }
102 }
103 }
104
105 public interface AddMessageCommand {
106 ActiveMQMessage getMessage();
107 void run() throws JMSException;
108 }
109
110 public interface RemoveMessageCommand {
111 MessageAck getMessageAck();
112 void run() throws JMSException;
113 }
114
115 public MessageStore proxy(MessageStore messageStore) {
116 return new ProxyMessageStore(messageStore) {
117 public void addMessage(final ActiveMQMessage message) throws JMSException {
118 VMTransactionStore.this.addMessage(getDelegate(), message);
119 }
120
121 public void removeMessage(final MessageAck ack) throws JMSException {
122 VMTransactionStore.this.removeMessage(getDelegate(), ack);
123 }
124 };
125 }
126
127 public TopicMessageStore proxy(TopicMessageStore messageStore) {
128 return new ProxyTopicMessageStore(messageStore) {
129 public void addMessage(final ActiveMQMessage message) throws JMSException {
130 VMTransactionStore.this.addMessage(getDelegate(), message);
131 }
132 public void removeMessage(final MessageAck ack) throws JMSException {
133 VMTransactionStore.this.removeMessage(getDelegate(), ack);
134 }
135 };
136 }
137
138 /**
139 * @see org.activemq.store.TransactionStore#prepare(org.activemq.service.Transaction)
140 */
141 public void prepare(Object txid) {
142 Tx tx = (Tx) inflightTransactions.remove(txid);
143 if (tx == null)
144 return;
145 preparedTransactions.put(txid, tx);
146 }
147
148 public Tx getTx(Object txid) {
149 Tx tx = (Tx) inflightTransactions.get(txid);
150 if (tx == null) {
151 tx = new Tx();
152 inflightTransactions.put(txid, tx);
153 }
154 return tx;
155 }
156
157 /**
158 * @throws XAException
159 * @see org.activemq.store.TransactionStore#commit(org.activemq.service.Transaction)
160 */
161 public void commit(Object txid, boolean wasPrepared) throws XAException {
162
163 Tx tx;
164 if( wasPrepared ) {
165 tx = (Tx) preparedTransactions.remove(txid);
166 } else {
167 tx = (Tx) inflightTransactions.remove(txid);
168 }
169
170 if( tx == null )
171 return;
172 tx.commit();
173
174 }
175
176 /**
177 * @see org.activemq.store.TransactionStore#rollback(org.activemq.service.Transaction)
178 */
179 public void rollback(Object txid) {
180 preparedTransactions.remove(txid);
181 inflightTransactions.remove(txid);
182 }
183
184 public void start() throws JMSException {
185 }
186
187 public void stop() throws JMSException {
188 }
189
190 synchronized public void recover(RecoveryListener listener) throws XAException {
191
192 // All the inflight transactions get rolled back..
193 inflightTransactions.clear();
194 this.doingRecover = true;
195 try {
196 for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
197 Object txid = (Object) iter.next();
198 try {
199 Tx tx = (Tx) preparedTransactions.get(txid);
200 listener.recover((ActiveMQXid) txid, tx.getMessages(), tx.getAcks());
201 } catch (JMSException e) {
202 throw (XAException) new XAException("Recovery of a transaction failed:").initCause(e);
203 }
204 }
205 } finally {
206 this.doingRecover = false;
207 }
208 }
209
210 /**
211 * @param message
212 * @throws JMSException
213 */
214 void addMessage(final MessageStore destination, final ActiveMQMessage message) throws JMSException {
215
216 if( doingRecover )
217 return;
218
219 if (message.isPartOfTransaction()) {
220 Tx tx = getTx(message.getTransactionId());
221 tx.add(new AddMessageCommand() {
222 public ActiveMQMessage getMessage() {
223 return message;
224 }
225 public void run() throws JMSException {
226 destination.addMessage(message);
227 }
228 });
229 } else {
230 destination.addMessage(message);
231 }
232 }
233
234 /**
235 * @param ack
236 * @throws JMSException
237 */
238 private void removeMessage(final MessageStore destination,final MessageAck ack) throws JMSException {
239 if( doingRecover )
240 return;
241
242 if (ack.isPartOfTransaction()) {
243 Tx tx = getTx(ack.getTransactionId());
244 tx.add(new RemoveMessageCommand() {
245 public MessageAck getMessageAck() {
246 return ack;
247 }
248 public void run() throws JMSException {
249 destination.removeMessage(ack);
250 }
251 });
252 } else {
253 destination.removeMessage(ack);
254 }
255 }
256
257 }