001 /**
002 *
003 * Copyright 2004 Hiram Chirino
004 * Copyright 2004 Protique Ltd
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 *
018 **/
019 package org.activemq.store.journal;
020
021 import java.io.DataInputStream;
022 import java.io.DataOutputStream;
023 import java.io.File;
024 import java.io.IOException;
025 import java.util.ArrayList;
026 import java.util.Iterator;
027 import java.util.Map;
028
029 import javax.jms.JMSException;
030 import javax.transaction.xa.XAException;
031
032 import org.activeio.adapter.PacketByteArrayOutputStream;
033 import org.activeio.adapter.PacketInputStream;
034 import org.activeio.journal.InvalidRecordLocationException;
035 import org.activeio.journal.Journal;
036 import org.activeio.journal.JournalEventListener;
037 import org.activeio.journal.RecordLocation;
038 import org.activeio.journal.active.JournalImpl;
039 import org.activeio.journal.howl.HowlJournal;
040 import org.activemq.io.WireFormat;
041 import org.activemq.io.impl.StatelessDefaultWireFormat;
042 import org.activemq.message.ActiveMQMessage;
043 import org.activemq.message.ActiveMQXid;
044 import org.activemq.message.MessageAck;
045 import org.activemq.message.Packet;
046 import org.activemq.service.MessageIdentity;
047 import org.activemq.store.MessageStore;
048 import org.activemq.store.PersistenceAdapter;
049 import org.activemq.store.TopicMessageStore;
050 import org.activemq.store.TransactionStore;
051 import org.activemq.store.jdbc.JDBCPersistenceAdapter;
052 import org.activemq.store.journal.JournalTransactionStore.Tx;
053 import org.activemq.store.journal.JournalTransactionStore.TxOperation;
054 import org.activemq.util.JMSExceptionHelper;
055 import org.apache.commons.logging.Log;
056 import org.apache.commons.logging.LogFactory;
057 import org.objectweb.howl.log.Configuration;
058
059 import EDU.oswego.cs.dl.util.concurrent.Channel;
060 import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
061 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
062 import EDU.oswego.cs.dl.util.concurrent.Latch;
063 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
064 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
065 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
066
067 /**
068 * An implementation of {@link PersistenceAdapter} designed for
069 * use with a {@link Journal} and then checkpointing asynchronously
070 * on a timeout with some other long term persistent storage.
071 *
072 * @version $Revision: 1.1 $
073 */
074 public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener {
075
076 private static final Log log = LogFactory.getLog(JournalPersistenceAdapter.class);
077 public static final String DEFAULT_JOURNAL_TYPE = "default";
078 public static final String HOWL_JOURNAL_TYPE = "howl";
079
080 private Journal journal;
081 private String journalType = DEFAULT_JOURNAL_TYPE;
082 private PersistenceAdapter longTermPersistence;
083 private File directory = new File("logs");
084 private final StatelessDefaultWireFormat wireFormat = new StatelessDefaultWireFormat();
085 private final ConcurrentHashMap messageStores = new ConcurrentHashMap();
086 private final ConcurrentHashMap topicMessageStores = new ConcurrentHashMap();
087
088 private static final int PACKET_RECORD_TYPE = 0;
089 private static final int COMMAND_RECORD_TYPE = 1;
090 private static final int TX_COMMAND_RECORD_TYPE = 2;
091 private static final int ACK_RECORD_TYPE = 3;
092
093 private Channel checkpointRequests = new LinkedQueue();
094 private QueuedExecutor checkpointExecutor;
095 ClockDaemon clockDaemon;
096 private Object clockTicket;
097 private JournalTransactionStore transactionStore = new JournalTransactionStore(this);
098 private int logFileSize=1024*1024*20;
099 private int logFileCount=2;
100 private long checkpointInterval = 1000 * 60 * 5;
101
102 public JournalPersistenceAdapter() {
103 checkpointExecutor = new QueuedExecutor(new LinkedQueue());
104 checkpointExecutor.setThreadFactory(new ThreadFactory() {
105 public Thread newThread(Runnable runnable) {
106 Thread answer = new Thread(runnable, "Checkpoint Worker");
107 answer.setDaemon(true);
108 answer.setPriority(Thread.MAX_PRIORITY);
109 return answer;
110 }
111 });
112 }
113
114 public JournalPersistenceAdapter(File directory, PersistenceAdapter longTermPersistence) throws IOException {
115 this();
116 this.directory = directory;
117 this.longTermPersistence = longTermPersistence;
118 }
119
120 public Map getInitialDestinations() {
121 return longTermPersistence.getInitialDestinations();
122 }
123
124 private MessageStore createMessageStore(String destination, boolean isQueue) throws JMSException {
125 if(isQueue) {
126 return createQueueMessageStore(destination);
127 } else {
128 return createTopicMessageStore(destination);
129 }
130 }
131
132 public MessageStore createQueueMessageStore(String destinationName) throws JMSException {
133 JournalMessageStore store = (JournalMessageStore) messageStores.get(destinationName);
134 if( store == null ) {
135 MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destinationName);
136 store = new JournalMessageStore(this, checkpointStore, destinationName);
137 messageStores.put(destinationName, store);
138 }
139 return store;
140 }
141
142 public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException {
143 JournalTopicMessageStore store = (JournalTopicMessageStore) topicMessageStores.get(destinationName);
144 if( store == null ) {
145 TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName);
146 store = new JournalTopicMessageStore(this, checkpointStore, destinationName);
147 topicMessageStores.put(destinationName, store);
148 }
149 return store;
150 }
151
152 public TransactionStore createTransactionStore() throws JMSException {
153 return transactionStore;
154 }
155
156 public void beginTransaction() throws JMSException {
157 longTermPersistence.beginTransaction();
158 }
159
160 public void commitTransaction() throws JMSException {
161 longTermPersistence.commitTransaction();
162 }
163
164 public void rollbackTransaction() {
165 longTermPersistence.rollbackTransaction();
166 }
167
168 public synchronized void start() throws JMSException {
169
170 if( longTermPersistence instanceof JDBCPersistenceAdapter ) {
171 // Disabled periodic clean up as it deadlocks with the checkpoint operations.
172 ((JDBCPersistenceAdapter)longTermPersistence).setCleanupPeriod(0);
173 }
174
175 longTermPersistence.start();
176 createTransactionStore();
177 if (journal == null) {
178 try {
179 log.info("Opening journal.");
180 journal = createJournal();
181 log.info("Opened journal: " + journal);
182 journal.setJournalEventListener(this);
183 }
184 catch (Exception e) {
185 throw JMSExceptionHelper.newJMSException("Failed to open transaction journal: " + e, e);
186 }
187 try {
188 recover();
189 }
190 catch (Exception e) {
191 throw JMSExceptionHelper.newJMSException("Failed to recover transactions from journal: " + e, e);
192 }
193 }
194
195 // Do a checkpoint periodically.
196 clockTicket = getClockDaemon().executePeriodically(checkpointInterval, new Runnable() {
197 public void run() {
198 checkpoint(false);
199 }
200 }, false);
201
202 }
203
204 public synchronized void stop() throws JMSException {
205
206 if (clockTicket != null) {
207 // Stop the periodical checkpoint.
208 ClockDaemon.cancel(clockTicket);
209 clockTicket=null;
210 clockDaemon.shutDown();
211 }
212
213 // Take one final checkpoint and stop checkpoint processing.
214 checkpoint(true);
215 checkpointExecutor.shutdownAfterProcessingCurrentlyQueuedTasks();
216
217 JMSException firstException = null;
218 if (journal != null) {
219 try {
220 journal.close();
221 journal = null;
222 }
223 catch (Exception e) {
224 firstException = JMSExceptionHelper.newJMSException("Failed to close journals: " + e, e);
225 }
226 }
227 longTermPersistence.stop();
228
229 if (firstException != null) {
230 throw firstException;
231 }
232 }
233
234 // Properties
235 //-------------------------------------------------------------------------
236 public PersistenceAdapter getLongTermPersistence() {
237 return longTermPersistence;
238 }
239
240 public void setLongTermPersistence(PersistenceAdapter longTermPersistence) {
241 this.longTermPersistence = longTermPersistence;
242 }
243
244 /**
245 * @return Returns the directory.
246 */
247 public File getDirectory() {
248 return directory;
249 }
250
251 /**
252 * @param directory The directory to set.
253 */
254 public void setDirectory(File directory) {
255 this.directory = directory;
256 }
257
258 /**
259 * @return Returns the wireFormat.
260 */
261 public WireFormat getWireFormat() {
262 return wireFormat;
263 }
264
265 public String getJournalType() {
266 return journalType;
267 }
268
269 public void setJournalType(String journalType) {
270 this.journalType = journalType;
271 }
272
273 protected Journal createJournal() throws IOException {
274 if( DEFAULT_JOURNAL_TYPE.equals(journalType) ) {
275 return new JournalImpl(directory,logFileCount,logFileSize);
276 }
277
278 if( HOWL_JOURNAL_TYPE.equals(journalType) ) {
279 try {
280 Configuration config = new Configuration();
281 config.setLogFileDir(directory.getCanonicalPath());
282 return new HowlJournal(config);
283 } catch (IOException e) {
284 throw e;
285 } catch (Exception e) {
286 throw (IOException)new IOException("Could not open HOWL journal: "+e.getMessage()).initCause(e);
287 }
288 }
289
290 throw new IllegalStateException("Unsupported valued for journalType attribute: "+journalType);
291 }
292
293 // Implementation methods
294 //-------------------------------------------------------------------------
295
296 /**
297 * The Journal give us a call back so that we can move old data out of the journal.
298 * Taking a checkpoint does this for us.
299 *
300 * @see org.activemq.journal.JournalEventListener#overflowNotification(org.activemq.journal.RecordLocation)
301 */
302 public void overflowNotification(RecordLocation safeLocation) {
303 checkpoint(false);
304 }
305
306 /**
307 * When we checkpoint we move all the journaled data to long term storage.
308 * @param b
309 */
310 public void checkpoint(boolean sync) {
311 try {
312
313 if( journal == null )
314 throw new IllegalStateException("Journal is closed.");
315
316 // Do the checkpoint asynchronously?
317 Latch latch=null;
318 if( sync ) {
319 latch = new Latch();
320 checkpointRequests.put(latch);
321 } else {
322 checkpointRequests.put(Boolean.TRUE);
323 }
324
325 checkpointExecutor.execute(new Runnable() {
326 public void run() {
327
328 ArrayList listners = new ArrayList();
329
330 try {
331 // Avoid running a checkpoint too many times in a row.
332 // Consume any queued up checkpoint requests.
333 try {
334 boolean requested = false;
335 Object t;
336 while ((t=checkpointRequests.poll(0)) != null) {
337 if( t.getClass()==Latch.class )
338 listners.add(t);
339 requested = true;
340 }
341 if (!requested) {
342 return;
343 }
344 }
345 catch (InterruptedException e1) {
346 return;
347 }
348
349 log.debug("Checkpoint started.");
350 RecordLocation newMark = null;
351
352 Iterator iterator = messageStores.values().iterator();
353 while (iterator.hasNext()) {
354 try {
355 JournalMessageStore ms = (JournalMessageStore) iterator.next();
356 RecordLocation mark = ms.checkpoint();
357 if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
358 newMark = mark;
359 }
360 }
361 catch (Exception e) {
362 log.error("Failed to checkpoint a message store: " + e, e);
363 }
364 }
365
366 iterator = topicMessageStores.values().iterator();
367 while (iterator.hasNext()) {
368 try {
369 JournalTopicMessageStore ms = (JournalTopicMessageStore) iterator.next();
370 RecordLocation mark = ms.checkpoint();
371 if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
372 newMark = mark;
373 }
374 }
375 catch (Exception e) {
376 log.error("Failed to checkpoint a message store: " + e, e);
377 }
378 }
379
380 try {
381 if (newMark != null) {
382 if( log.isDebugEnabled() )
383 log.debug("Marking journal: "+newMark);
384 journal.setMark(newMark, true);
385 }
386 }
387 catch (Exception e) {
388 log.error("Failed to mark the Journal: " + e, e);
389 }
390
391 // Clean up the DB if it's a JDBC store.
392 if( longTermPersistence instanceof JDBCPersistenceAdapter ) {
393 // Disabled periodic clean up as it deadlocks with the checkpoint operations.
394 ((JDBCPersistenceAdapter)longTermPersistence).cleanup();
395 }
396
397 log.debug("Checkpoint done.");
398 } finally {
399 for (Iterator iter = listners.iterator(); iter.hasNext();) {
400 Latch latch = (Latch) iter.next();
401 latch.release();
402 }
403 }
404 }
405 });
406
407 if( sync ) {
408 latch.acquire();
409 }
410 }
411 catch (InterruptedException e) {
412 log.warn("Request to start checkpoint failed: " + e, e);
413 }
414 }
415
416 /**
417 * @param destinationName
418 * @param message
419 * @param sync
420 * @throws JMSException
421 */
422 public RecordLocation writePacket(String destination, Packet packet, boolean sync) throws JMSException {
423 try {
424
425 PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream();
426 DataOutputStream os = new DataOutputStream(pos);
427 os.writeByte(PACKET_RECORD_TYPE);
428 os.writeUTF(destination);
429 os.close();
430 org.activeio.Packet p = wireFormat.writePacket(packet, pos);
431 return journal.write(p, sync);
432 }
433 catch (IOException e) {
434 throw createWriteException(packet, e);
435 }
436 }
437
438 /**
439 * @param destinationName
440 * @param message
441 * @param sync
442 * @throws JMSException
443 */
444 public RecordLocation writeCommand(String command, boolean sync) throws JMSException {
445 try {
446
447 PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream();
448 DataOutputStream os = new DataOutputStream(pos);
449 os.writeByte(COMMAND_RECORD_TYPE);
450 os.writeUTF(command);
451 os.close();
452 return journal.write(pos.getPacket(), sync);
453
454 }
455 catch (IOException e) {
456 throw createWriteException(command, e);
457 }
458 }
459
460 /**
461 * @param location
462 * @return
463 * @throws JMSException
464 */
465 public Packet readPacket(RecordLocation location) throws JMSException {
466 try {
467 org.activeio.Packet data = journal.read(location);
468 DataInputStream is = new DataInputStream(new PacketInputStream(data));
469 byte type = is.readByte();
470 if (type != PACKET_RECORD_TYPE) {
471 throw new IOException("Record is not a packet type.");
472 }
473 String destination = is.readUTF();
474 Packet packet = wireFormat.readPacket(data);
475 is.close();
476 return packet;
477
478 }
479 catch (InvalidRecordLocationException e) {
480 throw createReadException(location, e);
481 }
482 catch (IOException e) {
483 throw createReadException(location, e);
484 }
485 }
486
487
488 /**
489 * Move all the messages that were in the journal into long term storeage. We just replay and do a checkpoint.
490 *
491 * @throws JMSException
492 * @throws IOException
493 * @throws InvalidRecordLocationException
494 * @throws IllegalStateException
495 */
496 private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, JMSException {
497
498 RecordLocation pos = null;
499 int transactionCounter = 0;
500
501 log.info("Journal Recovery Started.");
502
503 // While we have records in the journal.
504 while ((pos = journal.getNextRecordLocation(pos)) != null) {
505 org.activeio.Packet data = journal.read(pos);
506 DataInputStream is = new DataInputStream(new PacketInputStream(data));
507
508 // Read the destination and packate from the record.
509 String destination = null;
510 Packet packet = null;
511 try {
512 byte type = is.readByte();
513 switch (type) {
514 case PACKET_RECORD_TYPE:
515
516 // Is the current packet part of the destination?
517 destination = is.readUTF();
518 packet = wireFormat.readPacket(data);
519
520 // Try to replay the packet.
521 if (packet instanceof ActiveMQMessage) {
522 ActiveMQMessage msg = (ActiveMQMessage) packet;
523
524 JournalMessageStore store = (JournalMessageStore) createMessageStore(destination, msg.getJMSActiveMQDestination().isQueue());
525 if( msg.getTransactionId()!=null ) {
526 transactionStore.addMessage(store, msg, pos);
527 } else {
528 store.replayAddMessage(msg);
529 transactionCounter++;
530 }
531 }
532 else if (packet instanceof MessageAck) {
533 MessageAck ack = (MessageAck) packet;
534 JournalMessageStore store = (JournalMessageStore) createMessageStore(destination, ack.getDestination().isQueue());
535 if( ack.getTransactionId()!=null ) {
536 transactionStore.removeMessage(store, ack, pos);
537 } else {
538 store.replayRemoveMessage(ack);
539 transactionCounter++;
540 }
541 }
542 else {
543 log.error("Unknown type of packet in transaction log which will be discarded: " + packet);
544 }
545
546 break;
547 case TX_COMMAND_RECORD_TYPE:
548
549 TxCommand command = new TxCommand();
550 command.setType(is.readByte());
551 command.setWasPrepared(is.readBoolean());
552 switch(command.getType()) {
553 case TxCommand.LOCAL_COMMIT:
554 case TxCommand.LOCAL_ROLLBACK:
555 command.setTransactionId(is.readUTF());
556 break;
557 default:
558 command.setTransactionId(ActiveMQXid.read(is));
559 break;
560 }
561
562 // Try to replay the packet.
563 switch(command.getType()) {
564 case TxCommand.XA_PREPARE:
565 transactionStore.replayPrepare(command.getTransactionId());
566 break;
567 case TxCommand.XA_COMMIT:
568 case TxCommand.LOCAL_COMMIT:
569 Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
570 // Replay the committed operations.
571 if( tx!=null) {
572 tx.getOperations();
573 for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
574 TxOperation op = (TxOperation) iter.next();
575 if( op.operationType == TxOperation.ADD_OPERATION_TYPE ) {
576 op.store.replayAddMessage((ActiveMQMessage) op.data);
577 }
578 if( op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
579 op.store.replayRemoveMessage((MessageAck) op.data);
580 }
581 if( op.operationType == TxOperation.ACK_OPERATION_TYPE) {
582 JournalAck ack = (JournalAck) op.data;
583 ((JournalTopicMessageStore)op.store).replayAcknowledge(ack.getSubscription(), new MessageIdentity(ack.getMessageId()));
584 }
585 }
586 transactionCounter++;
587 }
588 break;
589 case TxCommand.LOCAL_ROLLBACK:
590 case TxCommand.XA_ROLLBACK:
591 transactionStore.replayRollback(command.getTransactionId());
592 break;
593 }
594
595 break;
596
597 case ACK_RECORD_TYPE:
598
599 destination = is.readUTF();
600 String subscription = is.readUTF();
601 String messageId = is.readUTF();
602 Object transactionId=null;
603
604 JournalTopicMessageStore store = (JournalTopicMessageStore) createMessageStore(destination, false);
605 if( transactionId!=null ) {
606 JournalAck ack = new JournalAck(destination, subscription, messageId, transactionId);
607 transactionStore.acknowledge(store, ack, pos);
608 } else {
609 store.replayAcknowledge(subscription, new MessageIdentity(messageId));
610 transactionCounter++;
611 }
612
613 case COMMAND_RECORD_TYPE:
614
615 break;
616 default:
617 log.error("Unknown type of record in transaction log which will be discarded: " + type);
618 break;
619 }
620 }
621 finally {
622 is.close();
623 }
624 }
625
626 RecordLocation location = writeCommand("RECOVERED", true);
627 journal.setMark(location, true);
628
629 log.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
630 }
631
632 private JMSException createReadException(RecordLocation location, Exception e) {
633 return JMSExceptionHelper.newJMSException("Failed to read to journal for: " + location + ". Reason: " + e, e);
634 }
635
636 protected JMSException createWriteException(Packet packet, Exception e) {
637 return JMSExceptionHelper.newJMSException("Failed to write to journal for: " + packet + ". Reason: " + e, e);
638 }
639
640 private XAException createWriteException(TxCommand command, Exception e) {
641 return (XAException)new XAException("Failed to write to journal for: " + command + ". Reason: " + e).initCause(e);
642 }
643
644
645 protected JMSException createWriteException(String command, Exception e) {
646 return JMSExceptionHelper.newJMSException("Failed to write to journal for command: " + command + ". Reason: " + e, e);
647 }
648
649 protected JMSException createRecoveryFailedException(Exception e) {
650 return JMSExceptionHelper.newJMSException("Failed to recover from journal. Reason: " + e, e);
651 }
652
653 public ClockDaemon getClockDaemon() {
654 if (clockDaemon == null) {
655 clockDaemon = new ClockDaemon();
656 clockDaemon.setThreadFactory(new ThreadFactory() {
657 public Thread newThread(Runnable runnable) {
658 Thread thread = new Thread(runnable, "Checkpoint Timmer");
659 thread.setDaemon(true);
660 return thread;
661 }
662 });
663 }
664 return clockDaemon;
665 }
666
667 public void setClockDaemon(ClockDaemon clockDaemon) {
668 this.clockDaemon = clockDaemon;
669 }
670
671 /**
672 * @param xid
673 * @return
674 */
675 public RecordLocation writeTxCommand(TxCommand command, boolean sync) throws XAException {
676 try {
677
678 PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream();
679 DataOutputStream os = new DataOutputStream(pos);
680 os.writeByte(TX_COMMAND_RECORD_TYPE);
681 os.writeByte(command.getType());
682 os.writeBoolean(command.getWasPrepared());
683 switch(command.getType()) {
684 case TxCommand.LOCAL_COMMIT:
685 case TxCommand.LOCAL_ROLLBACK:
686 os.writeUTF( (String) command.getTransactionId() );
687 break;
688 default:
689 ActiveMQXid xid = (ActiveMQXid) command.getTransactionId();
690 xid.write(os);
691 break;
692 }
693 os.close();
694 return journal.write(pos.getPacket(), sync);
695 }
696 catch (IOException e) {
697 throw createWriteException(command, e);
698 }
699 }
700
701 /**
702 * @param destinationName
703 * @param persistentKey
704 * @param messageIdentity
705 * @param b
706 * @return
707 */
708 public RecordLocation writePacket(String destinationName, String subscription, MessageIdentity messageIdentity, boolean sync) throws JMSException{
709 try {
710
711 PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream();
712 DataOutputStream os = new DataOutputStream(pos);
713 os.writeByte(ACK_RECORD_TYPE);
714 os.writeUTF(destinationName);
715 os.writeUTF(subscription);
716 os.writeUTF(messageIdentity.getMessageID());
717 os.close();
718 return journal.write(pos.getPacket(), sync);
719
720 }
721 catch (IOException e) {
722 throw createWriteException("Ack for message: "+messageIdentity, e);
723 }
724 }
725
726 public JournalTransactionStore getTransactionStore() {
727 return transactionStore;
728 }
729
730 public int getLogFileCount() {
731 return logFileCount;
732 }
733
734 public void setLogFileCount(int logFileCount) {
735 this.logFileCount = logFileCount;
736 }
737
738 public int getLogFileSize() {
739 return logFileSize;
740 }
741
742 public void setLogFileSize(int logFileSize) {
743 this.logFileSize = logFileSize;
744 }
745
746 public long getCheckpointInterval() {
747 return checkpointInterval;
748 }
749 public void setCheckpointInterval(long checkpointInterval) {
750 this.checkpointInterval = checkpointInterval;
751 }
752 }