001 /**
002 *
003 * Copyright 2004 Hiram Chirino
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.transport.activeio;
020
021 import java.io.IOException;
022
023 import javax.jms.JMSException;
024
025 import org.activeio.AcceptListener;
026 import org.activeio.AsynchChannel;
027 import org.activeio.AsynchChannelServer;
028 import org.activeio.Channel;
029 import org.activeio.adapter.SynchToAsynchChannelAdapter;
030 import org.apache.commons.logging.Log;
031 import org.apache.commons.logging.LogFactory;
032 import org.activemq.io.WireFormat;
033 import org.activemq.transport.TransportServerChannelSupport;
034 import org.activemq.util.JMSExceptionHelper;
035
036 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
037
038 /**
039 * Binds to a well known port and listens for Sockets ...
040 *
041 * @version $Revision: 1.1.1.1 $
042 */
043 public class ActiveIOTransportServerChannel extends TransportServerChannelSupport implements AcceptListener {
044 private static final Log log = LogFactory.getLog(ActiveIOTransportServerChannel.class);
045
046 private final WireFormat wireFormat;
047
048 private final AsynchChannelServer server;
049
050 private final SynchronizedBoolean closed = new SynchronizedBoolean(false);
051
052 /**
053 * @param wireFormat
054 * @param server
055 */
056 public ActiveIOTransportServerChannel(WireFormat wireFormat, AsynchChannelServer server) {
057 super(server.getBindURI());
058 this.wireFormat = wireFormat;
059 this.server = server;
060 server.setAcceptListener(this);
061 }
062
063 public void start() throws JMSException {
064 try {
065 super.start();
066 server.start();
067 } catch (IOException e) {
068 throw JMSExceptionHelper.newJMSException(e.getMessage(), e);
069 }
070 }
071
072 public void stop() throws JMSException {
073 if (closed.commit(false, true)) {
074 super.stop();
075 server.dispose();
076 }
077 }
078
079 /**
080 * @return pretty print of this
081 */
082 public String toString() {
083 return "ActiveIOTransportServerChannel@" + getUrl();
084 }
085
086 public void onAccept(Channel c) {
087 if (closed.get()) {
088 c.dispose();
089 return;
090 }
091
092 AsynchChannel channel = SynchToAsynchChannelAdapter.adapt(c);
093 // If the channel is not allready buffered.. lets buffer it.
094 //if (channel.narrow(WriteBufferedAsynchChannel.class) == null
095 // && channel.narrow(WriteBufferedSynchChannel.class) == null) {
096 // channel = new WriteBufferedAsynchChannel(channel);
097 //}
098 addClient(new ActiveIOTransportChannel(wireFormat.copy(), channel));
099 }
100
101 public void onAcceptError(IOException error) {
102 if (!closed.get()) {
103 log.warn("Caught exception accepting connection: " + error, error);
104 try {
105 stop();
106 } catch (JMSException e) {
107 }
108 }
109
110 }
111
112 }