001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.transport.tcp;
020
021 import java.io.IOException;
022 import java.net.InetAddress;
023 import java.net.InetSocketAddress;
024 import java.net.ServerSocket;
025 import java.net.Socket;
026 import java.net.SocketTimeoutException;
027 import java.net.URI;
028 import java.net.URISyntaxException;
029 import java.net.UnknownHostException;
030
031 import javax.jms.JMSException;
032
033 import org.activemq.io.WireFormat;
034 import org.activemq.transport.TransportServerChannelSupport;
035 import org.activemq.util.JMSExceptionHelper;
036 import org.apache.commons.logging.Log;
037 import org.apache.commons.logging.LogFactory;
038
039 import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
040 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
041 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
042
043 /**
044 * Binds to a well known port and listens for Sockets ...
045 *
046 * @version $Revision: 1.1.1.1 $
047 */
048 public class TcpTransportServerChannel extends TransportServerChannelSupport implements Runnable {
049 private static final Log log = LogFactory.getLog(TcpTransportServerChannel.class);
050 protected static final int DEFAULT_BACKLOG = 500;
051 private WireFormat wireFormat;
052 private Thread serverSocketThread;
053 private ServerSocket serverSocket;
054 private SynchronizedBoolean closed;
055 private SynchronizedBoolean started;
056 private boolean useAsyncSend = false;
057 private int maxOutstandingMessages = 10;
058 private int backlog = DEFAULT_BACKLOG;
059
060 /**
061 * Default Constructor
062 *
063 * @param bindAddr
064 * @throws JMSException
065 */
066 public TcpTransportServerChannel(WireFormat wireFormat, URI bindAddr) throws JMSException {
067 super(bindAddr);
068 this.wireFormat = wireFormat;
069 closed = new SynchronizedBoolean(false);
070 started = new SynchronizedBoolean(false);
071 try {
072 serverSocket = createServerSocket(bindAddr);
073 serverSocket.setSoTimeout(2000);
074 updatePhysicalUri(bindAddr);
075 }
076 catch (Exception se) {
077 System.out.println(se);
078 se.printStackTrace();
079 throw JMSExceptionHelper.newJMSException("Bind to " + bindAddr + " failed: " + se.getMessage(), se);
080 }
081 }
082
083 public TcpTransportServerChannel(WireFormat wireFormat, ServerSocket serverSocket) throws JMSException {
084 super(serverSocket.getInetAddress().toString());
085 this.wireFormat = wireFormat;
086 this.serverSocket = serverSocket;
087 closed = new SynchronizedBoolean(false);
088 started = new SynchronizedBoolean(false);
089 InetAddress address = serverSocket.getInetAddress();
090 try {
091 updatePhysicalUri(new URI("tcp", "", address.getHostName(), 0, "", "", ""));
092 }
093 catch (URISyntaxException e) {
094 throw JMSExceptionHelper.newJMSException("Failed to extract URI: : " + e.getMessage(), e);
095 }
096 }
097
098 public void start() throws JMSException {
099 super.start();
100 if (started.commit(false, true)) {
101 log.info("Listening for connections at: " + getUrl());
102 serverSocketThread = new Thread(this, toString());
103 serverSocketThread.setDaemon(true);
104 serverSocketThread.start();
105 }
106 }
107
108 public void stop() throws JMSException {
109 if (closed.commit(false, true)) {
110 super.stop();
111 try {
112 if (serverSocket != null) {
113 serverSocket.close();
114 if (serverSocketThread != null) {
115 serverSocketThread.join();
116 serverSocketThread = null;
117 }
118 }
119 }
120 catch (Throwable e) {
121 throw JMSExceptionHelper.newJMSException("Failed to stop: " + e, e);
122 }
123 }
124 }
125
126 public InetSocketAddress getSocketAddress() {
127 return (InetSocketAddress) serverSocket.getLocalSocketAddress();
128 }
129
130 /**
131 * @return pretty print of this
132 */
133 public String toString() {
134 return "TcpTransportServerChannel@" + getUrl();
135 }
136
137 /**
138 * pull Sockets from the ServerSocket
139 */
140 public void run() {
141 while (!closed.get()) {
142 Socket socket = null;
143 try {
144 socket = serverSocket.accept();
145 if (socket != null) {
146 if (closed.get()) {
147 socket.close();
148 }
149 else {
150 // have thread per channel for sending messages and a thread for receiving them
151 PooledExecutor executor = null;
152 if (useAsyncSend) {
153 executor = new PooledExecutor(new BoundedBuffer(maxOutstandingMessages), 1);
154 executor.waitWhenBlocked();
155 executor.setKeepAliveTime(1000);
156 }
157 TcpTransportChannel channel = createTransportChannel(socket, executor);
158 addClient(channel);
159 }
160 }
161 }
162 catch (SocketTimeoutException ste) {
163 //expect this to happen
164 }
165 catch (Throwable e) {
166 if (!closed.get()) {
167 log.warn("run()", e);
168 }
169 }
170 }
171 }
172
173 protected TcpTransportChannel createTransportChannel(Socket socket, PooledExecutor executor) throws JMSException {
174 TcpTransportChannel channel = new TcpTransportChannel(this,wireFormat.copy(), socket, executor);
175 return channel;
176 }
177
178 // Properties
179 //-------------------------------------------------------------------------
180 public boolean isUseAsyncSend() {
181 return useAsyncSend;
182 }
183
184 public void setUseAsyncSend(boolean useAsyncSend) {
185 this.useAsyncSend = useAsyncSend;
186 }
187
188 public int getMaxOutstandingMessages() {
189 return maxOutstandingMessages;
190 }
191
192 public void setMaxOutstandingMessages(int maxOutstandingMessages) {
193 this.maxOutstandingMessages = maxOutstandingMessages;
194 }
195
196 public int getBacklog() {
197 return backlog;
198 }
199
200 public void setBacklog(int backlog) {
201 this.backlog = backlog;
202 }
203
204 public WireFormat getWireFormat() {
205 return wireFormat;
206 }
207
208 public void setWireFormat(WireFormat wireFormat) {
209 this.wireFormat = wireFormat;
210 }
211
212 // Implementation methods
213 //-------------------------------------------------------------------------
214 /**
215 * In cases where we construct ourselves with a zero port we need to regenerate the URI with the real physical port
216 * so that people can connect to us via discovery
217 */
218 protected void updatePhysicalUri(URI bindAddr) throws URISyntaxException {
219 URI newURI = new URI(bindAddr.getScheme(), bindAddr.getUserInfo(), resolveHostName(bindAddr.getHost()), serverSocket
220 .getLocalPort(), bindAddr.getPath(), bindAddr.getQuery(), bindAddr.getFragment());
221 setUrl(newURI.toString());
222 }
223
224 /**
225 * Factory method to create a new ServerSocket
226 *
227 * @throws UnknownHostException
228 * @throws IOException
229 */
230 protected ServerSocket createServerSocket(URI bind) throws UnknownHostException, IOException {
231 ServerSocket answer = null;
232 String host = bind.getHost();
233 host = (host == null || host.length() == 0) ? "localhost" : host;
234 InetAddress addr = InetAddress.getByName(host);
235 if (host.trim().equals("localhost") || addr.equals(InetAddress.getLocalHost())) {
236 answer = new ServerSocket(bind.getPort(), backlog);
237 }
238 else {
239 answer = new ServerSocket(bind.getPort(), backlog, addr);
240 }
241 return answer;
242 }
243 }