001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.io.util;
020
021 import java.util.ArrayList;
022 import java.util.Iterator;
023 import java.util.List;
024
025 import javax.jms.JMSException;
026
027 import org.apache.commons.logging.Log;
028 import org.apache.commons.logging.LogFactory;
029 import org.activemq.service.QueueListEntry;
030 import org.activemq.service.impl.DefaultQueueList;
031
032 /**
033 * @author Ramzi Saba
034 *
035 * A prioritized version of the MemoryBoundedQueue supporting the 10 JMS priority levels
036 * 0-9, 0 being the lowest and 9 being the highest.
037 *
038 * @version $Revision: 1.1.1.1 $
039 */
040 public class MemoryBoundedPrioritizedQueue extends MemoryBoundedQueue {
041
042 private static final Log log = LogFactory.getLog(MemoryBoundedPrioritizedQueue.class);
043 private static final int DEFAULT_PRIORITY = 4;
044 private final DefaultQueueList[] prioritizedPackets = new DefaultQueueList[10]; // array of 10 prioritized queues
045
046 /**
047 * Constructor
048 *
049 * @param name
050 * @param manager
051 * @param name
052 */
053 public MemoryBoundedPrioritizedQueue(MemoryBoundedQueueManager manager, String name) {
054 super(manager, name);
055 for (int i=0; i<10; ++i) {
056 this.prioritizedPackets[i] = new DefaultQueueList();
057 }
058 }
059
060 /**
061 * @return the number of items held by this queue
062 */
063 public int size() {
064 //return internalList.size();
065 int size=0;
066 for (int j=0; j<10; ++j) {
067 size += prioritizedPackets[j].size();
068 }
069 return size;
070 }
071
072 /**
073 * Enqueue a MemoryManageable without checking memory usage limits
074 *
075 * @param packet
076 */
077 public void enqueueNoBlock(MemoryManageable packet) {
078 if (!closed) {
079 //internalList.add(packet);
080 prioritizedPackets[getPacketPriority(packet)].add(packet);
081 incrementMemoryUsed(packet);
082 synchronized (outLock) {
083 outLock.notify();
084 }
085 }
086 }
087
088 /**
089 * Enqueue a packet to the head of the queue with total disregard for memory constraints
090 *
091 * @param packet
092 */
093 public final void enqueueFirstNoBlock(MemoryManageable packet) {
094 if (!closed) {
095 //internalList.addFirst(packet);
096 prioritizedPackets[getPacketPriority(packet)].addFirst(packet);
097 incrementMemoryUsed(packet);
098 synchronized (outLock) {
099 outLock.notify();
100 }
101 }
102 }
103
104 /**
105 * Enqueue an array of packets to the head of the queue with total disregard for memory constraints
106 *
107 * @param packets
108 */
109 public void enqueueAllFirstNoBlock(List packets) {
110 if (!closed) {
111 //internalList.addAllFirst(packets);
112 Iterator iterator = packets.iterator();
113 for (Iterator iter = packets.iterator(); iter.hasNext();) {
114 MemoryManageable packet = (MemoryManageable) iter.next();
115 prioritizedPackets[getPacketPriority(packet)].addFirst(packet);
116 incrementMemoryUsed(packet);
117 }
118 synchronized (outLock) {
119 outLock.notify();
120 }
121 }
122 }
123
124 /**
125 * @return the first dequeued MemoryManageable or blocks until one is available
126 * @throws InterruptedException
127 */
128 public MemoryManageable dequeue() throws InterruptedException {
129 MemoryManageable result = null;
130 synchronized (outLock) {
131 //while (internalList.isEmpty() && !closed) {
132 while (isEmpty() && !closed) {
133 outLock.wait(WAIT_TIMEOUT);
134 }
135 result = dequeueNoWait();
136 }
137 return result;
138 }
139
140 /**
141 * dequeues a MemoryManageable from the head of the queue
142 *
143 * @return the MemoryManageable at the head of the queue or null, if none is available
144 * @throws InterruptedException
145 */
146 public MemoryManageable dequeueNoWait() throws InterruptedException {
147 MemoryManageable packet = null;
148 synchronized (outLock) {
149 while (stopped && !closed) {
150 outLock.wait(WAIT_TIMEOUT);
151 }
152 }
153 //packet = (MemoryManageable) internalList.removeFirst();
154 for (int i=9; i>=0; --i) {
155 packet = (MemoryManageable) prioritizedPackets[i].removeFirst();
156 if (packet != null) break;
157 }
158 decrementMemoryUsed(packet);
159 if (packet != null) {
160 synchronized (inLock) {
161 inLock.notify();
162 }
163 }
164 return packet;
165 }
166
167 /**
168 * Remove a packet from the queue
169 *
170 * @param packet
171 * @return true if the packet was found
172 */
173 public boolean remove(MemoryManageable packet) {
174 boolean result = false;
175 //if (!internalList.isEmpty()) {
176 if (!isEmpty()) {
177 //result = internalList.remove(packet);
178 result = prioritizedPackets[getPacketPriority(packet)].remove(packet);
179 }
180 if (result) {
181 decrementMemoryUsed(packet);
182 }
183 synchronized (inLock) {
184 inLock.notify();
185 }
186 return result;
187 }
188
189 /**
190 * Remove a MemoryManageable by it's id
191 *
192 * @param id
193 * @return
194 */
195 public MemoryManageable remove(Object id) {
196 MemoryManageable result = null;
197 for (int i=0; i<10; ++i) {
198 //QueueListEntry entry = internalList.getFirstEntry();
199 QueueListEntry entry = prioritizedPackets[i].getFirstEntry();
200 try {
201 while (entry != null) {
202 MemoryManageable p = (MemoryManageable) entry.getElement();
203 if (p.getMemoryId().equals(id)) {
204 result = p;
205 remove(p);
206 break;
207 }
208 //entry = internalList.getNextEntry(entry);
209 entry = prioritizedPackets[i].getNextEntry(entry);
210 }
211 }
212 catch (JMSException jmsEx) {
213 jmsEx.printStackTrace();
214 }
215 }
216 synchronized (inLock) {
217 inLock.notify();
218 }
219 return result;
220 }
221
222 /**
223 * remove any MemoryManageables in the queue
224 */
225 public void clear() {
226 //while (!internalList.isEmpty()) {
227 for (int i=0; i<10; ++i) {
228 while (!prioritizedPackets[i].isEmpty()) {
229 //MemoryManageable packet = (MemoryManageable) internalList.removeFirst();
230 MemoryManageable packet = (MemoryManageable) prioritizedPackets[i].removeFirst();
231 decrementMemoryUsed(packet);
232 }
233 }
234 synchronized (inLock) {
235 inLock.notifyAll();
236 }
237 }
238
239 /**
240 * @return true if the queue is empty
241 */
242 public boolean isEmpty() {
243 //return internalList.isEmpty();
244 for (int i=0; i<10; ++i) {
245 if (!prioritizedPackets[i].isEmpty()) return false;
246 }
247 return true;
248 }
249
250 /**
251 * retrieve a MemoryManageable at an indexed position in the queue
252 *
253 * @param index
254 * @return
255 */
256 public MemoryManageable get(int index) {
257 //return (MemoryManageable) internalList.get(index);
258 throw new UnsupportedOperationException("Cannot invoke this method on a MemoryBoundedPrioritizedQueue instance");
259 }
260
261 /**
262 * Retrieve a shallow copy of the contents as a list
263 *
264 * @return a list containing the bounded queue contents
265 */
266 public List getContents() {
267 //Object[] array = internalList.toArray();
268 List list = new ArrayList();
269 for (int j=9; j>=0; --j) {
270 Object[] array = prioritizedPackets[j].toArray();
271 for (int i = 0; i < array.length; i++) {
272 list.add(array[i]);
273 }
274 }
275 return list;
276 }
277
278 private int getPacketPriority(MemoryManageable packet) {
279 int priority=DEFAULT_PRIORITY;
280 if (packet.getPriority()>=0 || packet.getPriority()<=9) {
281 priority = packet.getPriority();
282 }
283 return priority;
284 }
285 }