001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.service.impl;
019
020 import java.util.ArrayList;
021 import java.util.Iterator;
022 import java.util.List;
023 import java.util.Map;
024
025 import javax.jms.JMSException;
026 import javax.transaction.xa.XAException;
027
028 import org.activemq.broker.Broker;
029 import org.activemq.broker.BrokerClient;
030 import org.activemq.message.ActiveMQMessage;
031 import org.activemq.message.ActiveMQXid;
032 import org.activemq.message.MessageAck;
033 import org.activemq.service.Transaction;
034 import org.activemq.service.TransactionManager;
035 import org.activemq.store.TransactionStore;
036 import org.activemq.store.TransactionStore.RecoveryListener;
037 import org.activemq.util.JMSExceptionHelper;
038 import org.apache.commons.logging.Log;
039 import org.apache.commons.logging.LogFactory;
040
041 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
042
043 /**
044 * @version $Revision: 1.1.1.1 $
045 */
046 public class TransactionManagerImpl extends TransactionManager {
047 private static final Log log = LogFactory.getLog(TransactionManagerImpl.class);
048
049 // the broker on which transactions operate
050 private Broker broker;
051 // The prepared XA transactions.
052 private TransactionStore transactionStore;
053 // Maps clients to the txids that they created.
054 private Map activeClients = new ConcurrentHashMap();
055 // Maps txids to ActiveMQTransactions
056 private Map localTxs = new ConcurrentHashMap();
057 // Maps txids to ActiveMQTransactions
058 private Map xaTxs = new ConcurrentHashMap();
059
060 public TransactionManagerImpl(Broker broker, TransactionStore transactionStore) {
061 this.transactionStore = transactionStore;
062 this.broker = broker;
063 }
064
065 /**
066 * @see org.activemq.service.TransactionManager#createLocalTransaction(org.activemq.broker.BrokerClient, String)
067 */
068 public Transaction createLocalTransaction(final BrokerClient client, final String txid) throws JMSException {
069 AbstractTransaction t = new LocalTransactionCommand(localTxs, txid, transactionStore);
070 localTxs.put(txid, t);
071 return t;
072 }
073
074 /**
075 * @see org.activemq.service.TransactionManager#createXATransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid)
076 */
077 public Transaction createXATransaction(final BrokerClient client, final ActiveMQXid xid) throws XAException {
078
079 // The xa transaction may allready be running.
080 Transaction tx = (Transaction) xaTxs.get(xid);
081 if( tx == null ) {
082 if(log.isDebugEnabled())
083 log.debug("XA Transaction started: "+xid);
084 tx = new XATransactionCommand(xid, xaTxs, transactionStore);
085 xaTxs.put(xid, tx);
086 }
087 return tx;
088
089 }
090
091 /**
092 * @see org.activemq.service.TransactionManager#getLocalTransaction(String)
093 */
094 public Transaction getLocalTransaction(String txid) throws JMSException {
095 Transaction tx = (Transaction) localTxs.get(txid);
096 if (tx == null) {
097 throw new JMSException("Transaction '" + txid
098 + "' has not been started.");
099 }
100 return tx;
101 }
102
103 /**
104 * @see org.activemq.service.TransactionManager#getXATransaction(org.activemq.message.ActiveMQXid)
105 */
106 public Transaction getXATransaction(ActiveMQXid xid) throws XAException {
107 Transaction tx = (Transaction) xaTxs.get(xid);
108 if (tx == null) {
109 XAException e = new XAException("Transaction '" + xid + "' has not been started.");
110 e.errorCode = XAException.XAER_NOTA;
111 throw e;
112 }
113 return tx;
114 }
115
116 /**
117 * @see org.activemq.service.TransactionManager#getPreparedXATransactions()
118 */
119 public ActiveMQXid[] getPreparedXATransactions() throws XAException {
120 ArrayList txs = new ArrayList(xaTxs.size());
121 for (Iterator iter = xaTxs.keySet().iterator(); iter.hasNext();) {
122 ActiveMQXid tx = (ActiveMQXid) iter.next();
123 txs.add(tx);
124 }
125 ActiveMQXid rc[] = new ActiveMQXid[txs.size()];
126 txs.toArray(rc);
127 return rc;
128 }
129
130
131 /**
132 * @see org.activemq.service.TransactionManager#cleanUpClient(org.activemq.broker.BrokerClient)
133 */
134 public void cleanUpClient(BrokerClient client) throws JMSException {
135 // HRC: I don't think we need to keep track of the client's open transactions here...
136 // It seems like BrokerClientImpl.close() allready cleans up open transactions.
137 //
138 List list = (List) activeClients.remove(client);
139 if (list != null) {
140 for (int i = 0; i < list.size(); i++) {
141 try {
142 Object o = list.get(i);
143 if (o instanceof String) {
144 Transaction t = this.getLocalTransaction((String) o);
145 t.rollback();
146 }
147 else {
148 Transaction t = this.getXATransaction((ActiveMQXid) o);
149 t.rollback();
150 }
151 }
152 catch (Exception e) {
153 log.warn("ERROR Rolling back disconnected client's transactions: ", e);
154 }
155 }
156 list.clear();
157 }
158 }
159
160 /**
161 * @see org.activemq.service.TransactionManager#recover(org.activemq.service.Transaction)
162 */
163 public void recover(Transaction transaction) {
164 // first lets associate any transient data structurs with the
165 // transaction which has recently been loaded from disk
166 if (transaction instanceof XATransactionCommand) {
167 XATransactionCommand xaTransaction = (XATransactionCommand) transaction;
168 xaTransaction.initialise(xaTxs, transactionStore);
169 xaTxs.put(transaction.getTransactionId(), transaction);
170 }
171 }
172
173 public void start() throws JMSException {
174 transactionStore.start();
175 try {
176 transactionStore.recover(new RecoveryListener(){
177 public void recover(ActiveMQXid xid, ActiveMQMessage[] addedMessages, MessageAck[] aks) throws JMSException, XAException {
178 Transaction transaction = createXATransaction(null, xid);
179 for (int i = 0; i < addedMessages.length; i++) {
180 broker.sendMessage(null, addedMessages[i]);
181 }
182 for (int i = 0; i < aks.length; i++) {
183 broker.acknowledgeMessage(null, aks[i]);
184 }
185 transaction.prepare();
186 }
187 });
188 } catch (XAException e) {
189 throw JMSExceptionHelper.newJMSException("Recovery Failed: "+e.getMessage(), e);
190 }
191 }
192
193 public void stop() throws JMSException {
194 transactionStore.stop();
195 }
196
197 }