001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.transport.multicast;
020 import java.io.IOException;
021 import java.io.Serializable;
022 import java.net.URI;
023 import java.net.URISyntaxException;
024 import java.util.Iterator;
025 import java.util.Map;
026 import javax.jms.JMSException;
027 import org.apache.commons.logging.Log;
028 import org.apache.commons.logging.LogFactory;
029 import org.activemq.io.impl.DefaultWireFormat;
030 import org.activemq.message.ActiveMQMessage;
031 import org.activemq.message.ActiveMQObjectMessage;
032 import org.activemq.message.Packet;
033 import org.activemq.message.PacketListener;
034 import org.activemq.transport.DiscoveryAgentSupport;
035 import org.activemq.transport.DiscoveryEvent;
036 import org.activemq.util.IdGenerator;
037 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
038 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
039 import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
040
041
042 /**
043 * An agent used to discover other instances of a service
044 *
045 * @version $Revision: 1.1.1.1 $
046 */
047 public class MulticastDiscoveryAgent extends DiscoveryAgentSupport implements PacketListener, Runnable {
048 private static final Log log = LogFactory.getLog(MulticastDiscoveryAgent.class);
049 /**
050 * default URI used for discovery
051 */
052 public static final String DEFAULT_DISCOVERY_URI = "multicast://224.1.2.3:6066";
053 // private static final String KEEP_ALIVE_TYPE = "KEEP_ALIVE";
054 private static final String SERVICE_TYPE = "SERVICE";
055 private static final String ALIVE_TYPE = "ALIVE_TYPE";
056 private static final String SERVICE_NAME = "SERVICE_NAME";
057 private static final String CHANNEL_NAME = "CHANNEL_NAME";
058 private static final long DEFAULT_KEEP_ALIVE_TIMEOUT = 5000;
059 private static final int DEFAULT_TIMEOUT_COUNT = 2;
060 private ConcurrentHashMap services;
061 private ConcurrentHashMap keepAliveMap;
062 private SynchronizedBoolean started;
063 private MulticastTransportChannel channel;
064 private Thread runner;
065 private IdGenerator idGen;
066 private String localId;
067 private URI uri;
068 private int timeoutCount;
069 private long keepAliveTimeout;
070 private long timeoutExpiration;
071 //private ActiveMQMessage keepAliveMessage;
072 private ActiveMQObjectMessage serviceMessage;
073 private String serviceName = "";
074 private int timeToLive = 1;
075 private String channelName = "defaultChannel";
076
077 /**
078 * Construct a discovery agent that uses multicast
079 *
080 * @param channelName
081 * @throws JMSException
082 */
083 public MulticastDiscoveryAgent(String channelName) throws JMSException {
084 init();
085 this.channelName = channelName;
086 try {
087 setUri(new URI(DEFAULT_DISCOVERY_URI));
088 }
089 catch (URISyntaxException e) {
090 JMSException jmsEx = new JMSException("URI Syntax exception: " + e.getMessage());
091 jmsEx.setLinkedException(e);
092 throw jmsEx;
093 }
094 }
095
096 public MulticastDiscoveryAgent(URI uri) {
097 init();
098 this.uri = uri;
099 }
100
101 private void init() {
102 this.started = new SynchronizedBoolean(false);
103 this.services = new ConcurrentHashMap();
104 this.keepAliveMap = new ConcurrentHashMap();
105 this.idGen = new IdGenerator();
106 this.localId = idGen.generateId();
107 this.keepAliveTimeout = DEFAULT_KEEP_ALIVE_TIMEOUT;
108 this.timeoutCount = DEFAULT_TIMEOUT_COUNT;
109 this.timeoutExpiration = this.keepAliveTimeout * timeoutCount;
110 }
111
112
113 /**
114 * @return Returns the keepAliveTimeout.
115 */
116 public long getKeepAliveTimeout() {
117 return keepAliveTimeout;
118 }
119
120 /**
121 * @param keepAliveTimeout The keepAliveTimeout to set.
122 */
123 public void setKeepAliveTimeout(long keepAliveTimeout) {
124 this.keepAliveTimeout = keepAliveTimeout;
125 }
126
127 /**
128 * @return Returns the timeoutCount.
129 */
130 public int getTimeoutCount() {
131 return timeoutCount;
132 }
133
134 /**
135 * @param timeoutCount The timeoutCount to set.
136 */
137 public void setTimeoutCount(int timeoutCount) {
138 this.timeoutCount = timeoutCount;
139 }
140
141 /**
142 * @return Returns the localId.
143 */
144 public String getLocalId() {
145 return localId;
146 }
147
148 /**
149 * @param localId The localId to set.
150 */
151 public void setLocalId(String localId) {
152 this.localId = localId;
153 }
154
155 /**
156 * @return Returns the uri.
157 */
158 public URI getUri() {
159 return uri;
160 }
161
162 /**
163 * @param uri The uri to set.
164 */
165 public void setUri(URI uri) {
166 this.uri = uri;
167 }
168
169 /**
170 * @return the timeToLive of multicast packets used for discovery
171 */
172 public int getTimeToLive() {
173 return this.timeToLive;
174 }
175
176 /**
177 * @param timeToLive The timeToLive for multicast packets used in discovery.
178 * @throws IOException
179 */
180 public void setTimeToLive(int timeToLive) throws IOException {
181 this.timeToLive = timeToLive;
182 if (channel != null) {
183 channel.setTimeToLive(timeToLive);
184 }
185 }
186
187 /**
188 * @return Returns the channelName.
189 */
190 public String getChannelName() {
191 return channelName;
192 }
193
194 /**
195 * @param channelName The channelName to set.
196 */
197 public void setChannelName(String channelName) {
198 this.channelName = channelName;
199 }
200
201 /**
202 * @return a pretty print of this instance
203 */
204 public String toString() {
205 return "MulticastDiscoveryAgent:" + serviceName;
206 }
207
208 /**
209 * @return the number of active services, including self
210 */
211 public int getServicesCount() {
212 return (this.serviceMessage != null ? 1 : 0) + services.size();
213 }
214
215 /**
216 * Register a service for other discover nodes
217 *
218 * @param name
219 * @param details
220 * @throws JMSException
221 */
222 public void registerService(String name, Map details) throws JMSException {
223 if (this.serviceMessage != null){
224 //notify the old service has stopped
225 this.serviceMessage.setBooleanProperty(ALIVE_TYPE, false);
226 sendService();
227 }
228 this.serviceName = name;
229 this.serviceMessage = new ActiveMQObjectMessage();
230 this.serviceMessage.setJMSType(SERVICE_TYPE);
231 this.serviceMessage.setStringProperty(SERVICE_NAME, name);
232 this.serviceMessage.setStringProperty(CHANNEL_NAME, channelName);
233 this.serviceMessage.setBooleanProperty(ALIVE_TYPE, true);
234 this.serviceMessage.setObject((Serializable) details);
235 sendService();
236 }
237
238 /**
239 * start this discovery agent
240 *
241 * @throws JMSException
242 */
243 public void start() throws JMSException {
244 if (started.commit(false, true)) {
245 this.timeoutExpiration = this.keepAliveTimeout * timeoutCount;
246 channel = new MulticastTransportChannel(new DefaultWireFormat(), uri);
247
248 channel.setClientID(localId);
249 channel.setPacketListener(this);
250 try {
251 channel.setTimeToLive(getTimeToLive());
252 }
253 catch (IOException e) {
254 JMSException jmsEx = new JMSException("Set time to live failed");
255 jmsEx.setLinkedException(e);
256 throw jmsEx;
257 }
258 log.info("Starting multicast discovery agent on URI: " + uri + " with clientID: " + channel.getClientID());
259
260 channel.start();
261 runner = new Thread(this);
262 runner.setName(toString());
263 runner.setDaemon(true);
264 runner.setPriority(Thread.MAX_PRIORITY);
265 runner.start();
266 sendService();
267 fireServiceStarted(serviceMessage);
268 }
269 }
270
271 /**
272 * stop this discovery agent
273 *
274 * @throws JMSException
275 */
276 public void stop() throws JMSException {
277 boolean doStop = false;
278 synchronized (started) {
279 doStop = started.get();
280 if (doStop) {
281 if (this.serviceMessage != null){
282 //notify the old service has stopped
283 this.serviceMessage.setBooleanProperty(ALIVE_TYPE, false);
284 sendService();
285 }
286 channel.stop();
287 started.set(false);
288 }
289 }
290 if (doStop) {
291 fireServiceStopped(serviceMessage);
292 }
293 }
294
295 /**
296 * send a keep alive message
297 */
298 public void run() {
299 try {
300 int count = 0;
301 while (started.get()) {
302 sendService();
303 log.debug(serviceName + " sent keep alive");
304 if (++count >= timeoutCount) {
305 count = 0;
306 checkNodesAlive();
307 }
308 Thread.sleep(getKeepAliveTimeout());
309 }
310 }
311 catch (Throwable e) {
312 log.error(toString() + " run failed", e);
313 }
314 }
315
316 /**
317 * Consume multicast packets
318 *
319 * @param packet
320 */
321 public void consume(Packet packet) {
322 try {
323 if (packet != null && packet.isJMSMessage()) {
324 ActiveMQMessage msg = (ActiveMQMessage) packet;
325 String receivedChannelName = msg.getStringProperty(CHANNEL_NAME);
326 if (receivedChannelName != null && receivedChannelName.equals(channelName)) {
327 String type = msg.getJMSType();
328 if (type != null) {
329 if (type.equals(SERVICE_TYPE)) {
330 processService(msg);
331 }
332 else {
333 log.warn(toString() + " received Message of unknown type: " + type);
334 }
335 }
336 else {
337 log.error(toString() + " message type is null");
338 }
339 }
340 else {
341 if (log.isDebugEnabled()) {
342 log.debug("Discarded discovery message for channel: " + receivedChannelName + " in channel: " + channelName);
343 }
344 }
345 }
346 else {
347 log.warn(toString() + " received unexpected packet: " + packet);
348 }
349 }
350 catch (Throwable e) {
351 log.error(toString() + " couldn't process packet: " + packet, e);
352 }
353 }
354
355
356
357 private void sendService() throws JMSException {
358 if (started.get() && channel != null && !channel.isPendingStop() && serviceMessage != null) {
359 channel.asyncSend(serviceMessage);
360 }
361 }
362
363
364 private void processService(ActiveMQMessage message) throws JMSException {
365 if (message != null) {
366 ActiveMQObjectMessage objMsg = (ActiveMQObjectMessage) message;
367 String name = objMsg.getStringProperty(SERVICE_NAME);
368
369 if (log.isDebugEnabled()) {
370 log.debug("Service message received for: " + name);
371 }
372 addService(name);
373 ActiveMQObjectMessage oldMsg = (ActiveMQObjectMessage) services.get(name);
374 services.put(name, objMsg);
375 if (oldMsg == null) {
376 fireServiceStarted(objMsg);
377 //send out that we are here!
378 sendService();
379 }
380 if (message.getBooleanProperty(ALIVE_TYPE)) {
381 addService(name);
382 }
383 else {
384 removeService(name);
385 }
386 }
387 }
388
389 private void fireServiceStarted(ActiveMQObjectMessage message) throws JMSException {
390 if (message != null) {
391 String name = message.getStringProperty(SERVICE_NAME);
392 Map map = (Map) message.getObject();
393 DiscoveryEvent event = new DiscoveryEvent(this, name, map);
394 fireAddService(event);
395 }
396 }
397
398 private void fireServiceStopped(ActiveMQObjectMessage message) throws JMSException {
399 if (message != null) {
400 String name = message.getStringProperty(SERVICE_NAME);
401 Map map = (Map) message.getObject();
402 DiscoveryEvent event = new DiscoveryEvent(this, name, map);
403 fireRemoveService(event);
404 }
405 }
406
407 private void addService(String name) {
408 long timestamp = System.currentTimeMillis();
409 SynchronizedLong activeTime = (SynchronizedLong) keepAliveMap.get(name);
410 if (activeTime == null) {
411 activeTime = new SynchronizedLong(0);
412 keepAliveMap.put(name, activeTime);
413 }
414 activeTime.set(timestamp);
415 }
416
417 private void removeService(String name) throws JMSException {
418 keepAliveMap.remove(name);
419 ActiveMQObjectMessage message = (ActiveMQObjectMessage) services.remove(name);
420 if (message != null) {
421 fireServiceStopped(message);
422 }
423 }
424
425 private void checkNodesAlive() throws JMSException {
426 long timestamp = System.currentTimeMillis();
427 long timeout = timestamp - timeoutExpiration;
428 for (Iterator i = keepAliveMap.entrySet().iterator();i.hasNext();) {
429 Map.Entry entry = (Map.Entry) i.next();
430 SynchronizedLong activeTime = (SynchronizedLong) entry.getValue();
431 if (activeTime.get() < timeout) {
432 String name = entry.getKey().toString();
433 removeService(name);
434 log.warn(serviceName + " Expiring node: " + name);
435 }
436 }
437 }
438 }