001 /**
002 *
003 * Copyright 2004 Hiram Chirino
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.io.util;
019
020 import java.util.HashMap;
021 import java.util.Iterator;
022
023 import org.activemq.message.ActiveMQMessage;
024 import org.activemq.service.QueueListEntry;
025 import org.activemq.service.impl.DefaultQueueList;
026 import org.activemq.store.cache.MessageCache;
027
028 /**
029 * A simple cache that stores messages in memory. Cache entries are evicted
030 * when the memoryManager starts to run short on memory (A LRU cache is used).
031 *
032 * @version $Revision: 1.1.1.1 $
033 */
034 public class MemoryBoundedMessageCache implements MessageCache, MemoryBoundedObject {
035
036 private static final int OBJECT_OVERHEAD = 50;
037
038 private final MemoryBoundedObjectManager memoryManager;
039
040 /** msgId -> LRUNode */
041 private final HashMap messages = new HashMap();
042 /** Ordered list of messageIds recently used at the front */
043 private final DefaultQueueList lruList = new DefaultQueueList();
044
045 private int memoryUsedByThisCache;
046 private float growthLimit = 0.75f;
047 private boolean closed;
048
049 /** Used associate the a Message to it's QueueListEntry in the lruList */
050 private static class CacheNode {
051 ActiveMQMessage message;
052 QueueListEntry entry;
053 }
054
055 public MemoryBoundedMessageCache(MemoryBoundedObjectManager memoryManager) {
056 this.memoryManager = memoryManager;
057 this.memoryManager.add(this);
058 }
059
060 /**
061 * Gets a message that was previously <code>put</code> into this object.
062 *
063 * @param msgid
064 * @return null if the message was not previously put or if the message has expired out of the cache.
065 */
066 synchronized public ActiveMQMessage get(String msgid) {
067 CacheNode rc = (CacheNode) messages.get(msgid);
068 if( rc != null ) {
069 // Move to front (the recently used part of the list).
070 lruList.remove(rc.entry);
071 rc.entry = lruList.addFirst(msgid);
072 return rc.message;
073 }
074 return null;
075 }
076
077 /**
078 * Puts a message into the cache.
079 *
080 * @param messageID
081 * @param message
082 */
083 synchronized public void put(String messageID, ActiveMQMessage message) {
084
085 // Drop old messages until there is space.
086 while( isFull() && !messages.isEmpty() ) {
087 removeOldest();
088 }
089
090 if( !isFull() ) {
091 incrementMemoryUsed(message);
092 CacheNode newNode = new CacheNode();
093 newNode.message = message;
094 newNode.entry = lruList.addFirst(messageID);
095 CacheNode oldNode = (CacheNode) messages.put(messageID, newNode);
096 if( oldNode !=null ) {
097 lruList.remove(oldNode);
098 decrementMemoryUsed(oldNode.message);
099 }
100 }
101 }
102
103 private void removeOldest() {
104 String messageID = (String) lruList.removeLast();
105 CacheNode node = (CacheNode) messages.remove(messageID);
106 decrementMemoryUsed(node.message);
107 }
108
109 private boolean isFull() {
110 return memoryManager.getPercentFull() > growthLimit;
111 }
112
113 /**
114 * Remvoes a message from the cache.
115 *
116 * @param messageID
117 */
118 synchronized public void remove(String messageID) {
119 CacheNode node = (CacheNode) messages.remove(messageID);
120 if( node !=null ) {
121 lruList.remove(node.entry);
122 decrementMemoryUsed(node.message);
123 }
124 }
125
126 private void incrementMemoryUsed(ActiveMQMessage packet) {
127 if (packet != null) {
128 int size = OBJECT_OVERHEAD;
129 if (packet != null) {
130 if (packet.incrementMemoryReferenceCount() == 1) {
131 size += packet.getMemoryUsage();
132 }
133 }
134 synchronized( this ) {
135 memoryUsedByThisCache += size;
136 }
137 memoryManager.incrementMemoryUsed(size);
138 }
139 }
140
141 private void decrementMemoryUsed(ActiveMQMessage packet) {
142 if (packet != null) {
143 int size = OBJECT_OVERHEAD;
144 if (packet != null) {
145 if (packet.decrementMemoryReferenceCount() == 0) {
146 size += packet.getMemoryUsage();
147 }
148 }
149
150 synchronized( this ) {
151 memoryUsedByThisCache -= size;
152 }
153 memoryManager.decrementMemoryUsed(size);
154 }
155 }
156
157 /**
158 * @return returns the percentage of memory usage at which that cache will stop to grow.
159 */
160 public float getGrowthLimit() {
161 return growthLimit;
162 }
163
164 /**
165 * @param growTillFence the percentage of memory usage at which that cache will stop to grow.
166 */
167 public void setGrowthLimit(float growTillFence) {
168 this.growthLimit = growTillFence;
169 }
170
171 synchronized public void close() {
172 if( closed )
173 return;
174 closed=true;
175
176 for (Iterator iter = messages.values().iterator(); iter.hasNext();) {
177 CacheNode node = (CacheNode) iter.next();
178 decrementMemoryUsed(node.message);
179 }
180 messages.clear();
181 lruList.clear();
182
183 memoryManager.remove(this);
184 }
185 }