001 /**
002 *
003 * Copyright 2004 Hiram Chirino
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.store.journal;
020
021 import java.io.IOException;
022 import java.util.ArrayList;
023 import java.util.Iterator;
024
025 import javax.jms.JMSException;
026 import javax.transaction.xa.XAException;
027
028 import org.activeio.journal.RecordLocation;
029 import org.activemq.message.ActiveMQMessage;
030 import org.activemq.message.ActiveMQXid;
031 import org.activemq.message.MessageAck;
032 import org.activemq.store.TransactionStore;
033 import org.apache.derby.iapi.store.raw.xact.TransactionId;
034
035 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
036
037 /**
038 */
039 public class JournalTransactionStore implements TransactionStore {
040
041 private final JournalPersistenceAdapter peristenceAdapter;
042 ConcurrentHashMap inflightTransactions = new ConcurrentHashMap();
043 ConcurrentHashMap preparedTransactions = new ConcurrentHashMap();
044
045 public static class TxOperation {
046
047 static final byte ADD_OPERATION_TYPE = 0;
048 static final byte REMOVE_OPERATION_TYPE = 1;
049 static final byte ACK_OPERATION_TYPE = 3;
050
051 public byte operationType;
052 public JournalMessageStore store;
053 public Object data;
054
055 public TxOperation(byte operationType, JournalMessageStore store, Object data) {
056 this.operationType=operationType;
057 this.store=store;
058 this.data=data;
059 }
060
061 }
062 /**
063 * Operations
064 * @version $Revision: 1.3 $
065 */
066 public static class Tx {
067
068 private final RecordLocation location;
069 private ArrayList operations = new ArrayList();
070
071 public Tx(RecordLocation location) {
072 this.location=location;
073 }
074
075 public void add(JournalMessageStore store, ActiveMQMessage msg) {
076 operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg));
077 }
078
079 public void add(JournalMessageStore store, MessageAck ack) {
080 operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack));
081 }
082
083 public void add(JournalTopicMessageStore store, JournalAck ack) {
084 operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack));
085 }
086
087 public ActiveMQMessage[] getMessages() {
088 ArrayList list = new ArrayList();
089 for (Iterator iter = operations.iterator(); iter.hasNext();) {
090 TxOperation op = (TxOperation) iter.next();
091 if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) {
092 list.add(op.data);
093 }
094 }
095 ActiveMQMessage rc[] = new ActiveMQMessage[list.size()];
096 list.toArray(rc);
097 return rc;
098 }
099
100 public MessageAck[] getAcks() {
101 ArrayList list = new ArrayList();
102 for (Iterator iter = operations.iterator(); iter.hasNext();) {
103 TxOperation op = (TxOperation) iter.next();
104 if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) {
105 list.add(op.data);
106 }
107 }
108 MessageAck rc[] = new MessageAck[list.size()];
109 list.toArray(rc);
110 return rc;
111 }
112
113 public ArrayList getOperations() {
114 return operations;
115 }
116
117 }
118
119 public interface AddMessageCommand {
120 ActiveMQMessage getMessage();
121
122 void run() throws IOException;
123 }
124
125 public interface RemoveMessageCommand {
126 MessageAck getMessageAck();
127
128 void run() throws IOException;
129 }
130
131 public JournalTransactionStore(JournalPersistenceAdapter adapter) {
132 this.peristenceAdapter = adapter;
133 }
134
135 /**
136 * @throws XAException
137 * @throws IOException
138 * @see org.activemq.store.TransactionStore#prepare(TransactionId)
139 */
140 public void prepare(Object txid) throws XAException {
141 Tx tx = (Tx) inflightTransactions.remove(txid);
142 if (tx == null)
143 return;
144 peristenceAdapter.writeTxCommand(new TxCommand(TxCommand.XA_PREPARE, txid, false), true);
145 preparedTransactions.put(txid, tx);
146 }
147
148 /**
149 * @throws IOException
150 * @see org.activemq.store.TransactionStore#prepare(TransactionId)
151 */
152 public void replayPrepare(Object txid) throws IOException {
153 Tx tx = (Tx) inflightTransactions.remove(txid);
154 if (tx == null)
155 return;
156 preparedTransactions.put(txid, tx);
157 }
158
159 public Tx getTx(Object txid, RecordLocation location) {
160 Tx tx = (Tx) inflightTransactions.get(txid);
161 if (tx == null) {
162 tx = new Tx(location);
163 inflightTransactions.put(txid, tx);
164 }
165 return tx;
166 }
167
168 /**
169 * @throws XAException
170 * @throws XAException
171 * @see org.activemq.store.TransactionStore#commit(org.activemq.service.Transaction)
172 */
173 public void commit(Object txid, boolean wasPrepared) throws XAException {
174 Tx tx;
175 if (wasPrepared) {
176 tx = (Tx) preparedTransactions.remove(txid);
177 } else {
178 tx = (Tx) inflightTransactions.remove(txid);
179 }
180
181 if (tx == null)
182 return;
183
184 if (txid.getClass() == ActiveMQXid.class ) {
185 peristenceAdapter.writeTxCommand(new TxCommand(TxCommand.XA_COMMIT, txid, wasPrepared),
186 true);
187 } else {
188 peristenceAdapter.writeTxCommand(new TxCommand(TxCommand.LOCAL_COMMIT, txid, wasPrepared),
189 true);
190 }
191 }
192
193 /**
194 * @throws XAException
195 * @see org.activemq.store.TransactionStore#commit(org.activemq.service.Transaction)
196 */
197 public Tx replayCommit(Object txid, boolean wasPrepared) throws IOException {
198 if (wasPrepared) {
199 return (Tx) preparedTransactions.remove(txid);
200 } else {
201 return (Tx) inflightTransactions.remove(txid);
202 }
203 }
204
205 /**
206 * @throws XAException
207 * @throws IOException
208 * @see org.activemq.store.TransactionStore#rollback(TransactionId)
209 */
210 public void rollback(Object txid) throws XAException {
211
212 Tx tx = (Tx) inflightTransactions.remove(txid);
213 if (tx != null)
214 tx = (Tx) preparedTransactions.remove(txid);
215
216 if (tx != null) {
217 if (txid.getClass() == ActiveMQXid.class ) {
218 peristenceAdapter.writeTxCommand(new TxCommand(TxCommand.XA_ROLLBACK, txid, false),
219 true);
220 } else {
221 peristenceAdapter.writeTxCommand(new TxCommand(TxCommand.LOCAL_ROLLBACK, txid, false),
222 true);
223 }
224 }
225
226 }
227
228 /**
229 * @throws IOException
230 * @see org.activemq.store.TransactionStore#rollback(TransactionId)
231 */
232 public void replayRollback(Object txid) throws IOException {
233 Tx tx = (Tx) inflightTransactions.remove(txid);
234 if (tx != null)
235 tx = (Tx) preparedTransactions.remove(txid);
236 }
237
238 synchronized public void recover(RecoveryListener listener) throws XAException {
239 // All the inflight transactions get rolled back..
240 inflightTransactions.clear();
241 try {
242 for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
243 Object txid = (Object) iter.next();
244 Tx tx = (Tx) preparedTransactions.get(txid);
245 try {
246 listener.recover((ActiveMQXid) txid,tx.getMessages(), tx.getAcks());
247 } catch (JMSException e) {
248 throw (XAException)new XAException().initCause(e);
249 }
250 }
251 } finally {
252 }
253 }
254
255 /**
256 * @param message
257 * @throws IOException
258 */
259 void addMessage(JournalMessageStore store, ActiveMQMessage message, RecordLocation location) {
260 Tx tx = getTx(message.getTransactionId(), location);
261 tx.add(store, message);
262 }
263
264 /**
265 * @param ack
266 * @throws IOException
267 */
268 public void removeMessage(JournalMessageStore store, MessageAck ack, RecordLocation location) {
269 Tx tx = getTx(ack.getTransactionId(), location);
270 tx.add(store, ack);
271 }
272
273
274 public void acknowledge(JournalTopicMessageStore store, JournalAck ack, RecordLocation location) {
275 Tx tx = getTx(ack.getTransactionId(), location);
276 tx.add(store, ack);
277 }
278
279
280 public RecordLocation checkpoint() throws IOException {
281
282 // Nothing really to checkpoint.. since, we don't
283 // checkpoint tx operations in to long term store until they are committed.
284
285 // But we keep track of the first location of an operation
286 // that was associated with an active tx. The journal can not
287 // roll over active tx records.
288 RecordLocation rc = null;
289 for (Iterator iter = inflightTransactions.values().iterator(); iter.hasNext();) {
290 Tx tx = (Tx) iter.next();
291 RecordLocation location = tx.location;
292 if (rc == null || rc.compareTo(location) < 0) {
293 rc = location;
294 }
295 }
296 for (Iterator iter = preparedTransactions.values().iterator(); iter.hasNext();) {
297 Tx tx = (Tx) iter.next();
298 RecordLocation location = tx.location;
299 if (rc == null || rc.compareTo(location) < 0) {
300 rc = location;
301 }
302 }
303 return rc;
304 }
305
306 public void start() throws JMSException {
307 }
308
309 public void stop() throws JMSException {
310 }
311
312
313 }