001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.service.impl;
019
020 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
021 import org.activemq.broker.BrokerClient;
022 import org.activemq.message.ActiveMQDestination;
023 import org.activemq.message.ActiveMQMessage;
024 import org.activemq.service.DeadLetterPolicy;
025 import org.activemq.service.Dispatcher;
026 import org.activemq.service.MessageContainer;
027 import org.activemq.service.MessageContainerManager;
028 import org.activemq.service.Subscription;
029
030 import javax.jms.Destination;
031 import javax.jms.JMSException;
032 import java.util.Collections;
033 import java.util.HashMap;
034 import java.util.Iterator;
035 import java.util.Map;
036
037 /**
038 * @version $Revision: 1.1.1.1 $
039 */
040 public abstract class MessageContainerManagerSupport implements MessageContainerManager {
041 protected Dispatcher dispatcher;
042 protected Map messageContainers = new ConcurrentHashMap();
043 private Map destinations = new ConcurrentHashMap();
044 private boolean maintainDestinationStats = true;
045 private DeadLetterPolicy deadLetterPolicy;
046
047 public MessageContainerManagerSupport(Dispatcher dispatcher) {
048 this.dispatcher = dispatcher;
049 dispatcher.register(this);
050 }
051
052 public Map getDestinations() {
053 return Collections.unmodifiableMap(destinations);
054 }
055
056 public void start() throws JMSException {
057 dispatcher.start();
058 }
059
060 public void stop() throws JMSException {
061 dispatcher.stop();
062 JMSException firstException = null;
063 try {
064 dispatcher.stop();
065 }
066 catch (JMSException e) {
067 firstException = e;
068 }
069
070 // lets stop all the containers
071 for (Iterator iter = messageContainers.values().iterator(); iter.hasNext();) {
072 MessageContainer container = (MessageContainer) iter.next();
073 try {
074 container.stop();
075 }
076 catch (JMSException e) {
077 if (firstException == null) {
078 firstException = e;
079 }
080 }
081 }
082 if (firstException != null) {
083 throw firstException;
084 }
085
086 }
087
088 public synchronized MessageContainer getContainer(String destinationName) throws JMSException {
089 MessageContainer container = (MessageContainer) messageContainers.get(destinationName);
090 if (container == null) {
091 container = createContainer(destinationName);
092 container.start();
093 messageContainers.put(destinationName, container);
094
095 destinations.put(destinationName, createDestination(destinationName));
096 }
097 return container;
098 }
099
100 public void createMessageContainer(ActiveMQDestination dest) throws JMSException {
101 // since get creates the container if it does not exist.
102 getContainer(dest.getPhysicalName());
103 }
104
105 synchronized public Map getMessageContainerAdmins() {
106 HashMap map = new HashMap(messageContainers.size());
107 for (Iterator iter = messageContainers.values().iterator(); iter.hasNext();) {
108 MessageContainer mc = (MessageContainer) iter.next();
109 map.put(mc.getDestinationName(), mc.getMessageContainerAdmin());
110 }
111 return Collections.unmodifiableMap(map);
112 }
113
114
115 public synchronized void destroyMessageContainer(ActiveMQDestination dest) throws JMSException {
116 MessageContainer container = (MessageContainer) messageContainers.get(dest.getPhysicalName());
117 if (container != null) {
118 container.getMessageContainerAdmin().empty();
119 container.stop();
120 messageContainers.remove(dest.getPhysicalName());
121 destinations.remove(dest.getPhysicalName());
122 }
123 }
124
125 // Properties
126 //-------------------------------------------------------------------------
127 public boolean isMaintainDestinationStats() {
128 return maintainDestinationStats;
129 }
130
131 public void setMaintainDestinationStats(boolean maintainDestinationStats) {
132 this.maintainDestinationStats = maintainDestinationStats;
133 }
134
135 /**
136 * @return the DeadLetterPolicy for this Container Manager
137 */
138 public DeadLetterPolicy getDeadLetterPolicy(){
139 return deadLetterPolicy;
140 }
141
142 /**
143 * Set the DeadLetterPolicy for this Container Manager
144 * @param policy
145 */
146 public void setDeadLetterPolicy(DeadLetterPolicy policy){
147 this.deadLetterPolicy = policy;
148 }
149
150 // Implementation methods
151 //-------------------------------------------------------------------------
152
153 /**
154 * Factory method to create a new {@link Destination}
155 */
156 protected abstract Destination createDestination(String destinationName);
157
158 /**
159 * Factory method to create a new {@link MessageContainer}
160 */
161 protected abstract MessageContainer createContainer(String destinationName) throws JMSException;
162
163 /**
164 * Loads the container for the given name and destination on startup
165 */
166 protected void loadContainer(String destinationName, Destination destination) throws JMSException {
167 destinations.put(destinationName, destination);
168
169 MessageContainer container = createContainer(destinationName);
170 container.start();
171 messageContainers.put(destinationName, container);
172 }
173
174 /**
175 * Updates the message acknowledgement stats
176 *
177 * @param client
178 * @param subscription
179 */
180 protected void updateAcknowledgeStats(BrokerClient client, Subscription subscription) {
181 if (isMaintainDestinationStats()) {
182 // lets lookup the destination which has the stats hanging off it
183 String name = subscription.getDestination().getPhysicalName();
184 ActiveMQDestination destination = (ActiveMQDestination) destinations.get(name);
185 destination.getStats().onMessageAck();
186 }
187 }
188
189 /**
190 * Updates the message sending stats
191 *
192 * @param client
193 * @param message
194 * @throws JMSException
195 */
196 protected void updateSendStats(BrokerClient client, ActiveMQMessage message) throws JMSException {
197 if (isMaintainDestinationStats()) {
198 // lets lookup the destination which has the stats hanging off it
199 String name = message.getJMSActiveMQDestination().getPhysicalName();
200 ActiveMQDestination destination = (ActiveMQDestination) destinations.get(name);
201 if (destination != null){
202 destination.getStats().onMessageSend(message);
203 }
204 }
205 }
206
207 }