001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 * Copyright 2004 Hiram Chirino
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 *
018 **/
019
020 package org.activemq.io.util;
021 import java.util.ArrayList;
022 import java.util.Iterator;
023 import java.util.List;
024
025 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
026
027 /**
028 * A factory manager for MemoryBoundedQueue and also ensures that the maximum memory used by all active
029 * MemoryBoundedQueues created by this instance stays within the memory usage bounds set.
030 *
031 * @version $Revision: 1.1.1.1 $
032 */
033 public class MemoryBoundedQueueManager {
034
035 private final ConcurrentHashMap activeQueues = new ConcurrentHashMap();
036 private final MemoryBoundedObjectManager memoryManager;
037
038 /**
039 * @param name
040 * @param maxSize
041 */
042 public MemoryBoundedQueueManager(MemoryBoundedObjectManager memoryManager) {
043 this.memoryManager = memoryManager;
044 }
045
046 /**
047 * retrieve a named MemoryBoundedQueue or creates one if not found
048 *
049 * @param name
050 * @return an named instance of a MemoryBoundedQueue
051 */
052 public MemoryBoundedQueue getMemoryBoundedQueue(String name) {
053 MemoryBoundedQueue result = (MemoryBoundedQueue) activeQueues.get(name);
054 if (result == null) {
055 if (memoryManager.isSupportJMSPriority())
056 result = new MemoryBoundedPrioritizedQueue(this, name);
057 else
058 result = new MemoryBoundedQueue(this, name);
059 activeQueues.put(name, result);
060 }
061 return result;
062 }
063
064 /**
065 * close this queue manager and all associated MemoryBoundedQueues
066 */
067 public void close() {
068 memoryManager.close();
069 }
070
071 /**
072 * @return Returns the memoryManager.
073 */
074 public MemoryBoundedObjectManager getMemoryManager() {
075 return memoryManager;
076 }
077
078 public int getCurrentCapacity() {
079 return memoryManager.getCurrentCapacity();
080 }
081
082 public void add(MemoryBoundedQueue queue) {
083 memoryManager.add(queue);
084 }
085
086 public void remove(MemoryBoundedQueue queue) {
087 memoryManager.remove(queue);
088 activeQueues.remove(queue.getName());
089 }
090
091 public boolean isFull() {
092 return memoryManager.isFull();
093 }
094
095 public void incrementMemoryUsed(int size) {
096 memoryManager.incrementMemoryUsed(size);
097 }
098
099 public void decrementMemoryUsed(int size) {
100 memoryManager.decrementMemoryUsed(size);
101 }
102
103 public List getMemoryBoundedQueues(){
104 return new ArrayList(activeQueues.values());
105 }
106
107 public String dumpContents(){
108 String result = "Memory = " + memoryManager.getTotalMemoryUsedSize() + " , capacity = " + memoryManager.getCurrentCapacity() + "\n";
109 for (Iterator i = activeQueues.values().iterator(); i.hasNext(); ){
110 MemoryBoundedQueue q = (MemoryBoundedQueue)i.next();
111 result += "\t" + q.getName() + " enqueued = " + q.getContents().size() + " memory = " + q.getLocalMemoryUsedByThisQueue() + "\n";
112 }
113 result += "\n\n";
114 return result;
115 }
116
117 }