001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 * Copyright 2005 Hiram Chirino
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 *
018 **/
019
020 package org.activemq.service.boundedvm;
021 import org.activemq.broker.BrokerClient;
022 import org.activemq.filter.Filter;
023 import org.activemq.io.util.MemoryBoundedQueue;
024 import org.activemq.io.util.MemoryBoundedQueueManager;
025 import org.activemq.io.util.MemoryManageable;
026 import org.activemq.message.ActiveMQDestination;
027 import org.activemq.message.ActiveMQMessage;
028 import org.activemq.message.ConsumerInfo;
029 import org.activemq.service.DeadLetterPolicy;
030 import org.activemq.service.MessageContainerAdmin;
031 import org.activemq.service.MessageIdentity;
032 import org.activemq.service.QueueListEntry;
033 import org.activemq.service.RedeliveryPolicy;
034 import org.activemq.service.Service;
035 import org.activemq.service.TransactionManager;
036 import org.activemq.service.TransactionTask;
037 import org.activemq.service.impl.DefaultQueueList;
038 import org.activemq.store.MessageStore;
039 import org.activemq.store.RecoveryListener;
040 import org.apache.commons.logging.Log;
041 import org.apache.commons.logging.LogFactory;
042
043 import EDU.oswego.cs.dl.util.concurrent.Executor;
044 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
045
046 import javax.jms.JMSException;
047
048 import java.util.HashMap;
049 import java.util.List;
050 import java.util.Map;
051
052 /**
053 * A MessageContainer for Durable queues
054 *
055 * @version $Revision: 1.1.1.1 $
056 */
057 public class DurableQueueBoundedMessageContainer implements Service, Runnable, MessageContainerAdmin {
058
059 private final MessageStore messageStore;
060 private final MemoryBoundedQueueManager queueManager;
061 private final ActiveMQDestination destination;
062 private final Executor threadPool;
063 private final DeadLetterPolicy deadLetterPolicy;
064 private final Log log;
065 private final MemoryBoundedQueue queue;
066
067 private final DefaultQueueList subscriptions = new DefaultQueueList();
068 private final SynchronizedBoolean started = new SynchronizedBoolean(false);
069 private final SynchronizedBoolean running = new SynchronizedBoolean(false);
070 private final Object dispatchMutex = new Object();
071 private final Object subscriptionsMutex = new Object();
072
073 private long idleTimestamp; //length of time (ms) there have been no active subscribers
074
075 /**
076 * Construct this beast
077 *
078 * @param threadPool
079 * @param queueManager
080 * @param destination
081 * @param redeliveryPolicy
082 * @param deadLetterPolicy
083 */
084 public DurableQueueBoundedMessageContainer(MessageStore messageStore, Executor threadPool, MemoryBoundedQueueManager queueManager,
085 ActiveMQDestination destination,RedeliveryPolicy redeliveryPolicy, DeadLetterPolicy deadLetterPolicy) {
086 this.messageStore = messageStore;
087 this.threadPool = threadPool;
088 this.queueManager = queueManager;
089 this.destination = destination;
090 this.deadLetterPolicy = deadLetterPolicy;
091
092 this.queue = queueManager.getMemoryBoundedQueue("DURABLE_QUEUE:-" + destination.getPhysicalName());
093 this.log = LogFactory.getLog("DurableQueueBoundedMessageContainer:- " + destination);
094 }
095
096
097 /**
098 * @return true if there are subscribers waiting for messages
099 */
100 public boolean isActive(){
101 return !subscriptions.isEmpty();
102 }
103
104 /**
105 * @return true if no messages are enqueued
106 */
107 public boolean isEmpty(){
108 return queue.isEmpty();
109 }
110
111 /**
112 * @return the timestamp (ms) from the when the last active subscriber stopped
113 */
114 public long getIdleTimestamp(){
115 return idleTimestamp;
116 }
117
118
119
120 /**
121 * Add a consumer to dispatch messages to
122 *
123 * @param filter
124 * @param info
125 * @param client
126 * @return DurableQueueSubscription
127 * @throws JMSException
128 */
129 public DurableQueueSubscription addConsumer(Filter filter, ConsumerInfo info, BrokerClient client)
130 throws JMSException {
131 DurableQueueSubscription ts = findMatch(info);
132 if (ts == null) {
133 MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue("DURABLE_SUB:-"+info.getConsumerId());
134 MemoryBoundedQueue ackQueue = queueManager.getMemoryBoundedQueue("DURABLE_SUB_ACKED:-"+info.getConsumerId());
135 ts = new DurableQueueSubscription(client, queue, ackQueue, filter, info);
136 synchronized (subscriptionsMutex) {
137 idleTimestamp = 0;
138 subscriptions.add(ts);
139 checkRunning();
140 }
141 }
142 return ts;
143 }
144
145 /**
146 * Remove a consumer
147 *
148 * @param info
149 * @throws JMSException
150 */
151 public void removeConsumer(ConsumerInfo info) throws JMSException {
152 synchronized (subscriptionsMutex) {
153 DurableQueueSubscription ts = findMatch(info);
154 if (ts != null) {
155
156 subscriptions.remove(ts);
157 if (subscriptions.isEmpty()) {
158 running.commit(true, false);
159 idleTimestamp = System.currentTimeMillis();
160 }
161
162 // get unacknowledged messages and re-enqueue them
163 List list = ts.getUndeliveredMessages();
164 for (int i = list.size() - 1; i >= 0; i--) {
165 queue.enqueueFirstNoBlock((MemoryManageable) list.get(i));
166 }
167
168 // If it is a queue browser, then re-enqueue the browsed
169 // messages.
170 if (ts.isBrowser()) {
171 list = ts.listAckedMessages();
172 for (int i = list.size() - 1; i >= 0; i--) {
173 queue.enqueueFirstNoBlock((MemoryManageable) list
174 .get(i));
175 }
176 ts.removeAllAckedMessages();
177 }
178
179 ts.close();
180 }
181 }
182 }
183
184 /**
185 * start working
186 *
187 * @throws JMSException
188 */
189 public void start() throws JMSException {
190 if (started.commit(false, true)) {
191 messageStore.start();
192 messageStore.recover(new RecoveryListener() {
193 public void recoverMessage(MessageIdentity messageIdentity) throws JMSException {
194 recoverMessageToBeDelivered(messageIdentity);
195 }
196 });
197 checkRunning();
198 }
199 }
200
201 private void recoverMessageToBeDelivered(MessageIdentity msgId) throws JMSException {
202 DurableMessagePointer pointer = new DurableMessagePointer(messageStore, getDestination(), messageStore.getMessage(msgId));
203 queue.enqueue(pointer);
204 }
205
206 /**
207 * enqueue a message for dispatching
208 *
209 * @param message
210 * @throws JMSException
211 */
212 public void enqueue(final ActiveMQMessage message) throws JMSException {
213 final DurableMessagePointer pointer = new DurableMessagePointer(messageStore, getDestination(), message);
214 if (message.isAdvisory()) {
215 doAdvisoryDispatchMessage(pointer);
216 }
217 else {
218 messageStore.addMessage(message);
219 // If there is no transaction.. then this executes directly.
220 TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){
221 public void execute() throws Throwable {
222 queue.enqueue(pointer);
223 checkRunning();
224 }
225 });
226 }
227 }
228
229 public void redeliver(DurableMessagePointer message) {
230 queue.enqueueFirstNoBlock(message);
231 checkRunning();
232 }
233
234 public void redeliver(List messages) {
235 queue.enqueueAllFirstNoBlock(messages);
236 checkRunning();
237 }
238
239 /**
240 * stop working
241 */
242 public void stop() {
243 started.set(false);
244 running.set(false);
245 queue.clear();
246 }
247
248 /**
249 * close down this container
250 *
251 * @throws JMSException
252 */
253 public void close() throws JMSException {
254 if (started.get()) {
255 stop();
256 }
257 synchronized(subscriptionsMutex){
258 QueueListEntry entry = subscriptions.getFirstEntry();
259 while (entry != null) {
260 DurableQueueSubscription ts = (DurableQueueSubscription) entry.getElement();
261 ts.close();
262 entry = subscriptions.getNextEntry(entry);
263 }
264 subscriptions.clear();
265 }
266 }
267
268 /**
269 * do some dispatching
270 */
271 public void run() {
272 // Only allow one thread at a time to dispatch.
273 synchronized (dispatchMutex) {
274 boolean dispatched = false;
275 boolean targeted = false;
276 DurableMessagePointer messagePointer = null;
277 int notDispatchedCount = 0;
278 int sleepTime = 250;
279 int iterationsWithoutDispatchingBeforeStopping = 10000 / sleepTime;// ~10
280 // seconds
281 Map messageParts = new HashMap();
282 try {
283 while (started.get() && running.get()) {
284 dispatched = false;
285 targeted = false;
286 synchronized (subscriptionsMutex) {
287 if (!subscriptions.isEmpty()) {
288 messagePointer = (DurableMessagePointer) queue
289 .dequeue(sleepTime);
290 if (messagePointer != null) {
291 ActiveMQMessage message = messagePointer
292 .getMessage();
293 if (!message.isExpired()) {
294
295 QueueListEntry entry = subscriptions
296 .getFirstEntry();
297 while (entry != null) {
298 DurableQueueSubscription ts = (DurableQueueSubscription) entry
299 .getElement();
300 if (ts.isTarget(message)) {
301 targeted = true;
302 if (message.isMessagePart()) {
303 DurableQueueSubscription sameTarget = (DurableQueueSubscription) messageParts
304 .get(message
305 .getParentMessageID());
306 if (sameTarget == null) {
307 sameTarget = ts;
308 messageParts
309 .put(
310 message
311 .getParentMessageID(),
312 sameTarget);
313 }
314 sameTarget
315 .doDispatch(messagePointer);
316 if (message.isLastMessagePart()) {
317 messageParts
318 .remove(message
319 .getParentMessageID());
320 }
321 messagePointer = null;
322 dispatched = true;
323 notDispatchedCount = 0;
324 break;
325 } else if (ts.canAcceptMessages()) {
326 ts.doDispatch(messagePointer);
327 messagePointer = null;
328 dispatched = true;
329 notDispatchedCount = 0;
330 subscriptions.rotate();
331 break;
332 }
333 }
334 entry = subscriptions
335 .getNextEntry(entry);
336 }
337
338 } else {
339 // expire message
340 if (log.isDebugEnabled()) {
341 log.debug("expired message: "
342 + messagePointer);
343 }
344 if (deadLetterPolicy != null) {
345 deadLetterPolicy
346 .sendToDeadLetter(messagePointer
347 .getMessage());
348 }
349 messagePointer = null;
350 }
351 }
352 }
353 }
354 if (!dispatched) {
355 if (messagePointer != null) {
356 if (targeted) {
357 queue.enqueueFirstNoBlock(messagePointer);
358 } else {
359 //no matching subscribers - dump to end and hope one shows up ...
360 queue.enqueueNoBlock(messagePointer);
361
362 }
363 }
364 if (running.get()) {
365 if (notDispatchedCount++ > iterationsWithoutDispatchingBeforeStopping
366 && queue.isEmpty()) {
367 synchronized (running) {
368 running.commit(true, false);
369 }
370 } else {
371 Thread.sleep(sleepTime);
372 }
373 }
374 }
375 }
376 } catch (InterruptedException ie) {
377 //someone is stopping us from another thread
378 } catch (Throwable e) {
379 log.warn("stop dispatching", e);
380 stop();
381 }
382 }
383 }
384
385 private DurableQueueSubscription findMatch(ConsumerInfo info) throws JMSException {
386 DurableQueueSubscription result = null;
387 synchronized (subscriptionsMutex) {
388 QueueListEntry entry = subscriptions.getFirstEntry();
389 while (entry != null) {
390 DurableQueueSubscription ts = (DurableQueueSubscription) entry
391 .getElement();
392 if (ts.getConsumerInfo().equals(info)) {
393 result = ts;
394 break;
395 }
396 entry = subscriptions.getNextEntry(entry);
397 }
398 }
399 return result;
400 }
401
402 /**
403 * @return the destination associated with this container
404 */
405 public ActiveMQDestination getDestination() {
406 return destination;
407 }
408
409 /**
410 * @return the destination name
411 */
412 public String getDestinationName() {
413 return destination.getPhysicalName();
414 }
415
416 protected void clear() {
417 queue.clear();
418 }
419
420 protected void removeExpiredMessages() {
421 long currentTime = System.currentTimeMillis();
422 List list = queue.getContents();
423 for (int i = 0;i < list.size();i++) {
424 DurableMessagePointer msgPointer = (DurableMessagePointer) list.get(i);
425 ActiveMQMessage message = msgPointer.getMessage();
426 if (message.isExpired(currentTime)) {
427 // TODO: remove message from message store.
428 queue.remove(msgPointer);
429 if (log.isDebugEnabled()) {
430 log.debug("expired message: " + msgPointer);
431 }
432 }
433 }
434 }
435
436 protected void checkRunning(){
437 if (!running.get() && started.get() && !subscriptions.isEmpty()) {
438 synchronized (running) {
439 if (running.commit(false, true)) {
440 try {
441 threadPool.execute(this);
442 }
443 catch (InterruptedException e) {
444 log.error(this + " Couldn't start executing ",e);
445 }
446 }
447 }
448 }
449 }
450
451
452 /**
453 * @see org.activemq.service.MessageContainer#getMessageContainerAdmin()
454 */
455 public MessageContainerAdmin getMessageContainerAdmin() {
456 return this;
457 }
458
459 /**
460 * @see org.activemq.service.MessageContainerAdmin#empty()
461 */
462 public void empty() throws JMSException {
463 if( subscriptions.isEmpty() ) {
464 messageStore.removeAllMessages();
465 queue.clear();
466 } else {
467 throw new JMSException("Cannot empty a queue while it is use.");
468 }
469 }
470
471 /**
472 * Dispatch an Advisory Message
473 * @param messagePointer
474 */
475 private synchronized void doAdvisoryDispatchMessage(DurableMessagePointer messagePointer) {
476 ActiveMQMessage message = messagePointer.getMessage();
477 try {
478
479 if (message.isAdvisory() && !message.isExpired()) {
480 synchronized (subscriptionsMutex) {
481 QueueListEntry entry = subscriptions.getFirstEntry();
482 while (entry != null) {
483 DurableQueueSubscription ts = (DurableQueueSubscription) entry
484 .getElement();
485 if (ts.isTarget(message)) {
486 ts.doDispatch(messagePointer);
487 break;
488 }
489 entry = subscriptions.getNextEntry(entry);
490 }
491 }
492 }
493 } catch (JMSException jmsEx) {
494 log.warn("Failed to dispatch advisory", jmsEx);
495 }
496 }
497
498 }