001 /**
002 *
003 * Copyright 2004 Hiram Chirino
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.store.jdbc;
019
020 import java.sql.Connection;
021 import java.sql.SQLException;
022 import java.util.Map;
023
024 import javax.jms.JMSException;
025 import javax.sql.DataSource;
026
027 import org.apache.commons.logging.Log;
028 import org.apache.commons.logging.LogFactory;
029 import org.activemq.io.WireFormat;
030 import org.activemq.io.impl.StatelessDefaultWireFormat;
031 import org.activemq.store.MessageStore;
032 import org.activemq.store.PersistenceAdapter;
033 import org.activemq.store.TopicMessageStore;
034 import org.activemq.store.TransactionStore;
035 import org.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
036 import org.activemq.store.vm.VMTransactionStore;
037 import org.activemq.util.FactoryFinder;
038 import org.activemq.util.JMSExceptionHelper;
039
040 import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
041 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
042
043 /**
044 * A {@link PersistenceAdapter} implementation using JDBC for
045 * persistence storage.
046 *
047 * This persistence adapter will correctly remember prepared XA transactions,
048 * but it will not keep track of local transaction commits so that operations
049 * performed against the Message store are done as a single uow.
050 *
051 * @version $Revision: 1.1 $
052 */
053 public class JDBCPersistenceAdapter implements PersistenceAdapter {
054
055 private static final Log log = LogFactory.getLog(JDBCPersistenceAdapter.class);
056 private static FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/activemq/store/jdbc/");
057
058 private WireFormat wireFormat = new StatelessDefaultWireFormat();
059 private DataSource dataSource;
060 private JDBCAdapter adapter;
061 private String adapterClass;
062 private VMTransactionStore transactionStore;
063 private boolean dropTablesOnStartup=false;
064 private ClockDaemon clockDaemon;
065 private Object clockTicket;
066 private int cleanupPeriod = 1000 * 60 * 5;
067
068 public JDBCPersistenceAdapter() {
069 }
070
071 public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) {
072 this.dataSource = ds;
073 this.wireFormat = wireFormat;
074 }
075
076 public Map getInitialDestinations() {
077 return null;
078 }
079
080 public MessageStore createQueueMessageStore(String destinationName) throws JMSException {
081 if (adapter == null) {
082 throw new IllegalStateException("Not started");
083 }
084 MessageStore store = new JDBCMessageStore(this, adapter, wireFormat.copy(), destinationName);
085 if( transactionStore!=null ) {
086 store = transactionStore.proxy(store);
087 }
088 return store;
089 }
090
091 public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException {
092 if (adapter == null) {
093 throw new IllegalStateException("Not started");
094 }
095 TopicMessageStore store = new JDBCTopicMessageStore(this, adapter, wireFormat.copy(), destinationName);
096 if( transactionStore!=null ) {
097 store = transactionStore.proxy(store);
098 }
099 return store;
100 }
101
102 public TransactionStore createTransactionStore() throws JMSException {
103 if (adapter == null) {
104 throw new IllegalStateException("Not started");
105 }
106 if( this.transactionStore == null ) {
107 this.transactionStore = new VMTransactionStore();
108 }
109 return this.transactionStore;
110 }
111
112 public void beginTransaction() throws JMSException {
113 try {
114 Connection c = dataSource.getConnection();
115 c.setAutoCommit(false);
116 TransactionContext.pushConnection(c);
117 }
118 catch (SQLException e) {
119 throw JMSExceptionHelper.newJMSException("Failed to create transaction: " + e, e);
120 }
121 }
122
123 public void commitTransaction() throws JMSException {
124 Connection c = TransactionContext.popConnection();
125 if (c == null) {
126 log.warn("Commit while no transaction in progress");
127 }
128 else {
129 try {
130 c.commit();
131 }
132 catch (SQLException e) {
133 throw JMSExceptionHelper.newJMSException("Failed to commit transaction: " + c + ": " + e, e);
134 }
135 finally {
136 try {
137 c.close();
138 }
139 catch (Throwable e) {
140 }
141 }
142 }
143 }
144
145 public void rollbackTransaction() {
146 Connection c = TransactionContext.popConnection();
147 try {
148 c.rollback();
149 }
150 catch (SQLException e) {
151 log.warn("Cannot rollback transaction due to: " + e, e);
152 }
153 finally {
154 try {
155 c.close();
156 }
157 catch (Throwable e) {
158 }
159 }
160 }
161
162
163 public void start() throws JMSException {
164 beginTransaction();
165 Connection c = null;
166 try {
167 // Load the right adapter for the database
168 adapter = null;
169
170 try {
171 c = getConnection();
172 }
173 catch (SQLException e) {
174 throw JMSExceptionHelper.newJMSException("Could not get a database connection: "+e,e);
175 }
176
177 // If the adapter class is not specified.. try to dectect they right type by getting
178 // info from the database.
179 if( adapterClass == null ) {
180
181 try {
182
183 // Make the filename file system safe.
184 String dirverName = c.getMetaData().getDriverName();
185 dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase();
186
187 try {
188 adapter = (DefaultJDBCAdapter) factoryFinder.newInstance(dirverName);
189 log.info("Database driver recognized: [" + dirverName + "]");
190 }
191 catch (Throwable e) {
192 log.warn("Database driver NOT recognized: [" + dirverName + "]. Will use default JDBC implementation.");
193 }
194
195 }
196 catch (SQLException e) {
197 log.warn("JDBC error occured while trying to detect database type. Will use default JDBC implementation: "+e.getMessage());
198 log.debug("Reason: " + e, e);
199 }
200
201 } else {
202 try {
203 Class clazz = JDBCPersistenceAdapter.class.getClassLoader().loadClass(adapterClass);
204 adapter = (DefaultJDBCAdapter)clazz.newInstance();
205 }
206 catch (Throwable e) {
207 log.warn("Invalid JDBC adapter class class (" + adapterClass + "). Will use default JDBC implementation.");
208 log.debug("Reason: " + e, e);
209 }
210 }
211
212 // Use the default JDBC adapter if the
213 // Database type is not recognized.
214 if (adapter == null) {
215 adapter = new DefaultJDBCAdapter();
216 }
217
218 if( dropTablesOnStartup ) {
219 try {
220 adapter.doDropTables(c);
221 }
222 catch (SQLException e) {
223 log.warn("Cannot drop tables due to: " + e, e);
224 }
225 }
226 try {
227 adapter.doCreateTables(c);
228 }
229 catch (SQLException e) {
230 log.warn("Cannot create tables due to: " + e, e);
231 }
232 adapter.initSequenceGenerator(c);
233
234 }
235 finally {
236 commitTransaction();
237 }
238
239 // Cleanup the db periodically.
240 if( cleanupPeriod > 0 ) {
241 clockTicket = getClockDaemon().executePeriodically(cleanupPeriod, new Runnable() {
242 public void run() {
243 cleanup();
244 }
245 }, false);
246 }
247 cleanup();
248 }
249
250 public void cleanup() {
251 Connection c = null;
252 try {
253 log.debug("Cleaning up old messages.");
254 c = getConnection();
255 adapter.doDeleteOldMessages(c);
256 } catch (JMSException e) {
257 log.warn("Old message cleanup failed due to: " + e, e);
258 } catch (SQLException e) {
259 log.warn("Old message cleanup failed due to: " + e, e);
260 } finally {
261 returnConnection(c);
262 log.debug("Cleanup done.");
263 }
264 }
265
266 public void setClockDaemon(ClockDaemon clockDaemon) {
267 this.clockDaemon = clockDaemon;
268 }
269
270 public ClockDaemon getClockDaemon() {
271 if (clockDaemon == null) {
272 clockDaemon = new ClockDaemon();
273 clockDaemon.setThreadFactory(new ThreadFactory() {
274 public Thread newThread(Runnable runnable) {
275 Thread thread = new Thread(runnable, "Cleanup Timmer");
276 thread.setDaemon(true);
277 return thread;
278 }
279 });
280 }
281 return clockDaemon;
282 }
283
284 public synchronized void stop() throws JMSException {
285 if (clockTicket != null) {
286 // Stop the periodical cleanup.
287 ClockDaemon.cancel(clockTicket);
288 clockTicket=null;
289 clockDaemon.shutDown();
290 }
291 }
292
293 public DataSource getDataSource() {
294 return dataSource;
295 }
296
297 public void setDataSource(DataSource dataSource) {
298 this.dataSource = dataSource;
299 }
300
301 public WireFormat getWireFormat() {
302 return wireFormat;
303 }
304
305 public void setWireFormat(WireFormat wireFormat) {
306 this.wireFormat = wireFormat;
307 }
308
309 public Connection getConnection() throws SQLException {
310 Connection answer = TransactionContext.peekConnection();
311 if (answer == null) {
312 answer = dataSource.getConnection();
313 answer.setAutoCommit(true);
314 }
315 return answer;
316 }
317
318 public void returnConnection(Connection connection) {
319 if (connection == null) {
320 return;
321 }
322 Connection peek = TransactionContext.peekConnection();
323 if (peek != connection) {
324 try {
325 connection.close();
326 }
327 catch (SQLException e) {
328 }
329 }
330 }
331
332 /**
333 * @return Returns the adapterClass.
334 */
335 public String getAdapterClass() {
336 return adapterClass;
337 }
338
339 /**
340 * @param adapterClass The adapterClass to set.
341 */
342 public void setAdapterClass(String adapterClass) {
343 this.adapterClass = adapterClass;
344 }
345 /**
346 * @return Returns the dropTablesOnStartup.
347 */
348 public boolean getDropTablesOnStartup() {
349 return dropTablesOnStartup;
350 }
351 /**
352 * @param dropTablesOnStartup The dropTablesOnStartup to set.
353 */
354 public void setDropTablesOnStartup(boolean dropTablesOnStartup) {
355 this.dropTablesOnStartup = dropTablesOnStartup;
356 }
357
358 public int getCleanupPeriod() {
359 return cleanupPeriod;
360 }
361
362 public void setCleanupPeriod(int cleanupPeriod) {
363 this.cleanupPeriod = cleanupPeriod;
364 }
365 }