001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.service.impl;
019
020 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
021 import org.apache.commons.logging.Log;
022 import org.apache.commons.logging.LogFactory;
023 import org.activemq.broker.BrokerClient;
024 import org.activemq.message.ActiveMQMessage;
025 import org.activemq.service.MessageContainerManager;
026 import org.activemq.service.Service;
027 import org.activemq.service.Subscription;
028
029 import javax.jms.JMSException;
030 import java.util.Map;
031 import java.util.Iterator;
032
033 /**
034 * A Dispatcher that polls for updates for active Message Consumers
035 *
036 * @version $Revision: 1.1.1.1 $
037 */
038 public class DispatchWorker implements Runnable, Service {
039 private static final Log log = LogFactory.getLog(DispatchWorker.class);
040 private static final int POLL_TIMEOUT = 250;
041
042 private Map subscriptions = new ConcurrentHashMap(1000, 0.75f);
043 private Object lock = new Object();
044 private boolean active = true;
045 private boolean started = false;
046 private MessageContainerManager containerManager;
047
048 /**
049 * Register the MessageContainerManager for the Dispatcher
050 *
051 * @param mcm
052 */
053 public void register(MessageContainerManager mcm) {
054 this.containerManager = mcm;
055 }
056
057 /**
058 * Called to indicate that there is work to do on a Subscription this will wake up a Dispatch Worker if it is
059 * waiting for messages to dispatch
060 */
061 public void wakeup() {
062 synchronized (lock) {
063 active = true;
064 lock.notifyAll();
065 }
066 }
067
068 /**
069 * Add an active subscription
070 *
071 * @param client
072 * @param sub
073 */
074 public void addActiveSubscription(BrokerClient client, Subscription sub) {
075 if (log.isDebugEnabled()) {
076 log.info("Adding subscription: " + sub + " to client: " + client);
077 }
078 subscriptions.put(sub, client);
079 }
080
081 /**
082 * remove an active subscription
083 *
084 * @param client
085 * @param sub
086 */
087 public void removeActiveSubscription(BrokerClient client, Subscription sub) {
088 if (log.isDebugEnabled()) {
089 log.info("Removing subscription: " + sub + " from client: " + client);
090 }
091 subscriptions.remove(sub);
092 }
093
094 /**
095 * dispatch messages to active Consumers
096 *
097 * @see java.lang.Runnable#run()
098 */
099 public void run() {
100 while (started) {
101 doPoll();
102 boolean dispatched = false;
103 try {
104 // our collection will not throw concurrent modification exception
105 for (Iterator iter = subscriptions.keySet().iterator(); iter.hasNext();) {
106 Subscription sub = (Subscription) iter.next();
107 if (sub != null && sub.isReadyToDispatch()) {
108 dispatched = dispatchMessages(sub, dispatched);
109 }
110 }
111 }
112 catch (JMSException jmsEx) {
113 log.error("Could not dispatch to Subscription: " + jmsEx, jmsEx);
114 }
115 if (!dispatched) {
116 synchronized (lock) {
117 active = false;
118 if (!active && started) {
119 try {
120 lock.wait(POLL_TIMEOUT);
121 }
122 catch (InterruptedException e) {
123 }
124 }
125 }
126 }
127 }
128 }
129
130
131 /**
132 * start the DispatchWorker
133 *
134 * @see org.activemq.service.Service#start()
135 */
136 public void start() {
137 started = true;
138 }
139
140 /**
141 * stop the DispatchWorker
142 *
143 * @see org.activemq.service.Service#stop()
144 */
145 public void stop() {
146 started = false;
147 }
148
149
150 // Implementation methods
151 //-------------------------------------------------------------------------
152
153 protected boolean dispatchMessages(Subscription subscription, boolean dispatched) throws JMSException {
154 ActiveMQMessage[] msgs = subscription.getMessagesToDispatch();
155 if (msgs != null && msgs.length > 0) {
156 BrokerClient client = (BrokerClient) subscriptions.get(subscription);
157 if (client == null) {
158 log.warn("Null client for subscription: " + subscription);
159 }
160 else {
161 for (int i = 0; i < msgs.length; i++) {
162 ActiveMQMessage msg = msgs[i].shallowCopy();
163
164 if (log.isDebugEnabled()) {
165 log.debug("Dispatching message: " + msg);
166 }
167 int[] consumerNos = new int[1];
168 consumerNos[0] = subscription.getConsumerNumber();
169 msg.setConsumerNos(consumerNos);
170 client.dispatch(msg);
171 dispatched = true;
172 }
173 }
174 }
175 return dispatched;
176 }
177
178 protected void doPoll() {
179 if (containerManager != null && started) {
180 try {
181 containerManager.poll();
182 }
183 catch (JMSException e) {
184 log.error("Error polling from the ContainerManager: ", e);
185 }
186 }
187 }
188 }