001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.service;
020 import javax.jms.JMSException;
021 import org.apache.commons.logging.*;
022 import org.activemq.broker.Broker;
023 import org.activemq.message.ActiveMQDestination;
024 import org.activemq.message.ActiveMQMessage;
025
026 /**
027 * Determines how messages are stored in a dead letter queue
028 *
029 * @version $Revision: 1.1.1.1 $
030 */
031 public class DeadLetterPolicy {
032 /**
033 * Prefix used by dead letter queues
034 */
035 public static final String DEAD_LETTER_PREFIX = "org.activemq.deadletter.";
036 private static final String DEFAULT_DEAD_LETTER_NAME = "DLQ";
037 private static final Log log = LogFactory.getLog(DeadLetterPolicy.class);
038 private Broker broker;
039 private String deadLetterPrefix = DEAD_LETTER_PREFIX;
040 private String deadLetterName = DEFAULT_DEAD_LETTER_NAME;
041 private boolean deadLetterEnabled = true;
042 private boolean deadLetterPerDestinationName = true;
043 private boolean storeNonPersistentMessages = true;
044 private boolean noTopicConsumerEnabled = true;
045
046
047 /**
048 * Construct a dead letter policy
049 *
050 * @param broker
051 */
052 public DeadLetterPolicy(Broker broker) {
053 this.broker = broker;
054 }
055
056 /**
057 * Default constructor
058 */
059 public DeadLetterPolicy() {
060 }
061
062 /**
063 * @return Returns the broker.
064 */
065 public Broker getBroker() {
066 return broker;
067 }
068
069 /**
070 * @param broker The broker to set.
071 */
072 public void setBroker(Broker broker) {
073 this.broker = broker;
074 }
075
076 /**
077 * @return Returns the deadLetterEnabled.
078 */
079 public boolean isDeadLetterEnabled() {
080 return deadLetterEnabled;
081 }
082
083 /**
084 * @param deadLetterEnabled The deadLetterEnabled to set.
085 */
086 public void setDeadLetterEnabled(boolean deadLetterEnabled) {
087 this.deadLetterEnabled = deadLetterEnabled;
088 }
089
090 /**
091 * @return Returns the deadLetterPerDestinationName.
092 */
093 public boolean isDeadLetterPerDestinationName() {
094 return deadLetterPerDestinationName;
095 }
096
097 /**
098 * @param deadLetterPerDestinationName The deadLetterPerDestinationName to set.
099 */
100 public void setDeadLetterPerDestinationName(boolean deadLetterPerDestinationName) {
101 this.deadLetterPerDestinationName = deadLetterPerDestinationName;
102 }
103
104 /**
105 * @return Returns the deadLetterName.
106 */
107 public String getDeadLetterName() {
108 return deadLetterName;
109 }
110
111 /**
112 * @param deadLetterName The deadLetterName to set.
113 */
114 public void setDeadLetterName(String deadLetterName) {
115 this.deadLetterName = deadLetterName;
116 }
117
118 /**
119 * @return Returns the deadLetterPrefix.
120 */
121 public String getDeadLetterPrefix() {
122 return deadLetterPrefix;
123 }
124
125 /**
126 * @param deadLetterPrefix The deadLetterPrefix to set.
127 */
128 public void setDeadLetterPrefix(String deadLetterPrefix) {
129 this.deadLetterPrefix = deadLetterPrefix;
130 }
131
132 /**
133 * @return Returns the storeNonPersistentMessages.
134 */
135 public boolean isStoreNonPersistentMessages() {
136 return storeNonPersistentMessages;
137 }
138
139 /**
140 * @param storeNonPersistentMessages The storeNonPersistentMessages to set.
141 */
142 public void setStoreNonPersistentMessages(boolean storeNonPersistentMessages) {
143 this.storeNonPersistentMessages = storeNonPersistentMessages;
144 }
145
146 /**
147 * @return Returns the noTopicConsumerEnabled.
148 */
149 public boolean isNoTopicConsumerEnabled() {
150 return noTopicConsumerEnabled;
151 }
152 /**
153 * @param noTopicConsumerEnabled The noTopicConsumerEnabled to set.
154 */
155 public void setNoTopicConsumerEnabled(boolean noTopicConsumerEnabled) {
156 this.noTopicConsumerEnabled = noTopicConsumerEnabled;
157 }
158
159 /**
160 * Get the name of the DLQ from the destination provided
161 * @param destination
162 * @return the name of the DLQ for this Destination
163 */
164 public String getDeadLetterNameFromDestination(ActiveMQDestination destination){
165 String deadLetterName = deadLetterPrefix;
166 if (deadLetterPerDestinationName) {
167 deadLetterName += destination.getPhysicalName();
168 }
169 else {
170 deadLetterName += deadLetterName;
171 }
172 return deadLetterName;
173 }
174
175 /**
176 * Send a message to a dead letter queue
177 *
178 * @param message
179 * @throws JMSException
180 */
181 public void sendToDeadLetter(ActiveMQMessage message) throws JMSException {
182 if (deadLetterEnabled && message != null && (message.isPersistent() || storeNonPersistentMessages)) {
183 if (broker != null) {
184 String dlqName = getDeadLetterNameFromDestination(message.getJMSActiveMQDestination());
185 message.setDispatchedFromDLQ(true);
186 broker.sendToDeadLetterQueue(dlqName, message);
187 if (log.isDebugEnabled()) log.debug("Passed message: " + message + " to DLQ: " + dlqName);
188 }
189 else {
190 log.warn("Broker is not initialized - cannot add to DLQ: " + message);
191 }
192 }else if (log.isDebugEnabled()){
193 log.debug("DLQ not storing message: " + message);
194 }
195 }
196 }