001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.service.boundedvm;
020 import java.util.ArrayList;
021 import java.util.Iterator;
022 import java.util.List;
023
024 import javax.jms.JMSException;
025
026 import org.apache.commons.logging.Log;
027 import org.apache.commons.logging.LogFactory;
028 import org.activemq.broker.BrokerClient;
029 import org.activemq.filter.Filter;
030 import org.activemq.io.util.MemoryBoundedQueue;
031 import org.activemq.message.ActiveMQDestination;
032 import org.activemq.message.ActiveMQMessage;
033 import org.activemq.message.ConsumerInfo;
034 import org.activemq.message.MessageAck;
035 import org.activemq.service.MessageContainer;
036 import org.activemq.service.MessageContainerAdmin;
037 import org.activemq.service.MessageIdentity;
038 import org.activemq.service.Service;
039
040 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
041 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
042
043 /**
044 * A MessageContainer for transient topics One of these exists for every active Connection consuming transient Topic
045 * messages
046 *
047 * @version $Revision: 1.1.1.1 $
048 */
049 public class TransientTopicBoundedMessageContainer
050 implements
051 MessageContainer,
052 Service,
053 Runnable,
054 MessageContainerAdmin {
055 private SynchronizedBoolean started;
056 private TransientTopicBoundedMessageManager manager;
057 private BrokerClient client;
058 private MemoryBoundedQueue queue;
059 private Thread worker;
060 private CopyOnWriteArrayList subscriptions;
061 private Log log;
062
063 /**
064 * Construct this beast
065 *
066 * @param manager
067 * @param client
068 * @param queue
069 */
070 public TransientTopicBoundedMessageContainer(TransientTopicBoundedMessageManager manager, BrokerClient client,
071 MemoryBoundedQueue queue) {
072 this.manager = manager;
073 this.client = client;
074 this.queue = queue;
075 this.started = new SynchronizedBoolean(false);
076 this.subscriptions = new CopyOnWriteArrayList();
077 this.log = LogFactory.getLog("TransientTopicBoundedMessageContainer:- " + client);
078 }
079
080 /**
081 * @return true if this Container has no active subscriptions
082 */
083 public boolean isInactive() {
084 return subscriptions.isEmpty();
085 }
086
087 /**
088 * @return the BrokerClient this Container is dispatching to
089 */
090 public BrokerClient getBrokerClient() {
091 return client;
092 }
093
094 /**
095 * Add a consumer to dispatch messages to
096 *
097 * @param filter
098 * @param info
099 */
100 public TransientTopicSubscription addConsumer(Filter filter, ConsumerInfo info) {
101 TransientTopicSubscription ts = findMatch(info);
102 if (ts == null) {
103 ts = new TransientTopicSubscription(filter, info, client);
104 subscriptions.add(ts);
105 }
106 return ts;
107 }
108
109 /**
110 * Remove a consumer
111 *
112 * @param info
113 */
114 public void removeConsumer(ConsumerInfo info) {
115 TransientTopicSubscription ts = findMatch(info);
116 if (ts != null) {
117 subscriptions.remove(ts);
118 }
119 }
120
121 /**
122 * start working
123 */
124 public void start() {
125 if (started.commit(false, true)) {
126 if (manager.isDecoupledDispatch()) {
127 worker = new Thread(this, "TransientTopicDispatcher");
128 worker.setPriority(Thread.NORM_PRIORITY + 2);
129 worker.start();
130 }
131 }
132 }
133
134 /**
135 * See if this container should get this message and dispatch it
136 *
137 * @param sender the BrokerClient the message came from
138 * @param message
139 * @return true if it is a valid container
140 * @throws JMSException
141 */
142 public boolean targetAndDispatch(BrokerClient sender, ActiveMQMessage message) throws JMSException {
143 boolean result = false;
144 if (!this.client.isClusteredConnection() || !sender.isClusteredConnection()) {
145 List tmpList = null;
146 for (Iterator i = subscriptions.iterator();i.hasNext();) {
147 TransientTopicSubscription ts = (TransientTopicSubscription) i.next();
148
149 if (!(ts.client.getChannel() != null && ts.client.getChannel().isMulticast() && ts.client.equals(sender)) && ts.isTarget(message)) {
150 if (tmpList == null) {
151 tmpList = new ArrayList();
152 }
153 tmpList.add(ts);
154 }
155 }
156 dispatchToQueue(message, tmpList);
157 result = tmpList != null;
158 }
159 return result;
160 }
161
162 /**
163 * stop working
164 */
165 public void stop() {
166 started.set(false);
167 queue.clear();
168 }
169
170 /**
171 * close down this container
172 */
173 public void close() {
174 if (started.get()) {
175 stop();
176 }
177 queue.close();
178 }
179
180
181 /**
182 * do some dispatching
183 */
184 public void run() {
185 int count = 0;
186 ActiveMQMessage message = null;
187 while (started.get()) {
188 try {
189 message = (ActiveMQMessage) queue.dequeue(2000);
190 if (message != null) {
191 if (!message.isExpired()) {
192 client.dispatch(message);
193 if (++count == 250) {
194 count = 0;
195 Thread.yield();
196 }
197 }else {
198 if (log.isDebugEnabled()){
199 log.debug("Message: " + message + " has expired");
200 }
201 }
202 }
203 }
204 catch (Exception e) {
205 stop();
206 log.warn("stop dispatching", e);
207 }
208 }
209 }
210
211 private void dispatchToQueue(ActiveMQMessage message, List list) throws JMSException {
212 if (list != null && !list.isEmpty()) {
213 int[] ids = new int[list.size()];
214 for (int i = 0;i < list.size();i++) {
215 TransientTopicSubscription ts = (TransientTopicSubscription) list.get(i);
216 ids[i] = ts.getConsumerInfo().getConsumerNo();
217 }
218 message = message.shallowCopy();
219 message.setConsumerNos(ids);
220 if (manager.isDecoupledDispatch()) {
221 queue.enqueue(message);
222 }
223 else {
224 client.dispatch(message);
225 }
226 }
227 }
228
229 private TransientTopicSubscription findMatch(ConsumerInfo info) {
230 TransientTopicSubscription result = null;
231 for (Iterator i = subscriptions.iterator();i.hasNext();) {
232 TransientTopicSubscription ts = (TransientTopicSubscription) i.next();
233 if (ts.getConsumerInfo().equals(info)) {
234 result = ts;
235 break;
236 }
237 }
238 return result;
239 }
240
241 /**
242 * @param destination
243 * @return true if a
244 */
245 public boolean hasConsumerFor(ActiveMQDestination destination) {
246 for (Iterator i = subscriptions.iterator();i.hasNext();) {
247 TransientTopicSubscription ts = (TransientTopicSubscription) i.next();
248 ConsumerInfo info = ts.getConsumerInfo();
249 if (info.getDestination().matches(destination)) {
250 return true;
251 }
252 }
253 return false;
254 }
255
256 /**
257 * @return the destination name
258 */
259 public String getDestinationName() {
260 return "";
261 }
262
263 /**
264 * @param msg
265 * @return @throws JMSException
266 */
267 public void addMessage(ActiveMQMessage msg) throws JMSException {
268 }
269
270 /**
271 * @param messageIdentity
272 * @param ack
273 * @throws JMSException
274 */
275 public void delete(MessageIdentity messageIdentity, MessageAck ack) throws JMSException {
276 }
277
278 /**
279 * @param messageIdentity
280 * @return @throws JMSException
281 */
282 public ActiveMQMessage getMessage(MessageIdentity messageIdentity) throws JMSException {
283 return null;
284 }
285
286 /**
287 * @param messageIdentity
288 * @throws JMSException
289 */
290 public void registerMessageInterest(MessageIdentity messageIdentity) throws JMSException {
291 }
292
293 /**
294 * @param messageIdentity
295 * @param ack
296 * @throws JMSException
297 */
298 public void unregisterMessageInterest(MessageIdentity messageIdentity) throws JMSException {
299 }
300
301 /**
302 * @param messageIdentity
303 * @return @throws JMSException
304 */
305 public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException {
306 return false;
307 }
308
309 /**
310 * @see org.activemq.service.MessageContainer#getMessageContainerAdmin()
311 */
312 public MessageContainerAdmin getMessageContainerAdmin() {
313 return this;
314 }
315
316 /**
317 * @see org.activemq.service.MessageContainerAdmin#empty()
318 */
319 public void empty() throws JMSException {
320 // TODO implement me
321 }
322
323 /**
324 * @see org.activemq.service.MessageContainer#isDeadLetterQueue()
325 */
326 public boolean isDeadLetterQueue() {
327 return false;
328 }
329 }