001 /**
002 *
003 * Copyright 2004 Hiram Chirino
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.transport.activeio;
020 import java.io.DataInputStream;
021 import java.io.DataOutputStream;
022 import java.io.EOFException;
023 import java.io.IOException;
024 import java.net.SocketException;
025
026 import javax.jms.JMSException;
027
028 import org.activeio.AsynchChannel;
029 import org.activeio.AsynchChannelListener;
030 import org.activeio.adapter.PacketByteArrayOutputStream;
031 import org.activeio.adapter.PacketInputStream;
032 import org.activeio.net.SocketMetadata;
033 import org.apache.commons.logging.Log;
034 import org.apache.commons.logging.LogFactory;
035 import org.activemq.io.WireFormat;
036 import org.activemq.message.Packet;
037 import org.activemq.transport.TransportChannelSupport;
038 import org.activemq.transport.TransportStatusEvent;
039 import org.activemq.util.JMSExceptionHelper;
040
041 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
042
043 /**
044 * A tcp implementation of a TransportChannel
045 *
046 * @version $Revision: 1.1.1.1 $
047 */
048 public class ActiveIOTransportChannel extends TransportChannelSupport implements AsynchChannelListener {
049
050 private static final Log log = LogFactory.getLog(ActiveIOTransportChannel.class);
051 private final Object writeLock = new Object();
052 private final AsynchChannel asynchChannel;
053 private final SynchronizedBoolean closed = new SynchronizedBoolean(false);
054 private final PacketByteArrayOutputStream outputBuffer = new PacketByteArrayOutputStream();
055 private final DataOutputStream dataOut = new DataOutputStream(outputBuffer);
056
057 private final PacketAggregator aggregator = new PacketAggregator() {
058 protected void packetAssembled(org.activeio.Packet packet) {
059 try {
060 Packet p = getWireFormat().readPacket(new DataInputStream(new PacketInputStream(packet)));
061 if( p!=null ) {
062 doConsumePacket(p);
063 }
064 } catch (IOException e) {
065 onPacketError(e);
066 }
067 }
068 };
069
070 public ActiveIOTransportChannel(WireFormat wireFormat, AsynchChannel asynchChannel) {
071 super(wireFormat);
072 this.asynchChannel = asynchChannel;
073 asynchChannel.setAsynchChannelListener(this);
074
075 // Enable TcpNoDelay if possible
076 SocketMetadata socket = (SocketMetadata) asynchChannel.narrow(SocketMetadata.class);
077 if(socket!=null) {
078 try {
079 socket.setTcpNoDelay(true);
080 } catch (SocketException e) {
081 }
082 }
083 }
084
085 public void start() throws JMSException {
086 try {
087 asynchChannel.start();
088 } catch (IOException e) {
089 throw JMSExceptionHelper.newJMSException(e.getMessage(),e);
090 }
091 }
092
093 public void stop() {
094 if (closed.commit(false, true)) {
095 super.stop();
096 asynchChannel.dispose();
097 }
098 }
099
100 public void forceDisconnect() {
101 log.debug("Forcing disconnect");
102 asynchChannel.dispose();
103 }
104
105
106 public void asyncSend(Packet packet) throws JMSException {
107 doAsyncSend(packet);
108 }
109
110 protected Packet doAsyncSend(Packet packet) throws JMSException {
111 Packet response = null;
112 try {
113 synchronized (writeLock) {
114 response = getWireFormat().writePacket(packet, dataOut);
115 dataOut.flush();
116 asynchChannel.write( outputBuffer.getPacket() );
117 asynchChannel.flush();
118 outputBuffer.reset();
119 }
120 }
121 catch (IOException e) {
122 if (closed.get()) {
123 log.trace("Caught exception while closed: " + e, e);
124 }
125 else {
126 throw JMSExceptionHelper.newJMSException("asyncSend failed: " + e, e);
127 }
128 }
129 catch (JMSException e) {
130 if (closed.get()) {
131 log.trace("Caught exception while closed: " + e, e);
132 }
133 else {
134 throw e;
135 }
136 }
137 return response;
138 }
139
140 public void onPacket(org.activeio.Packet packet) {
141 try {
142 aggregator.addRawPacket(packet);
143 } catch (IOException e) {
144 onPacketError(e);
145 }
146 }
147
148 public void onPacketError(IOException ex) {
149 if (!closed.get()) {
150 if (!pendingStop){
151 setPendingStop(true);
152 setTransportConnected(false);
153 if (ex instanceof EOFException && isServerSide() == false) {
154 log.warn("Peer closed connection", ex);
155 }
156 else {
157 onAsyncException(JMSExceptionHelper.newJMSException("Error reading socket: " + ex, ex));
158 }
159 fireStatusEvent(new TransportStatusEvent(this,TransportStatusEvent.DISCONNECTED));
160 }
161 stop();
162 }
163 }
164
165 public AsynchChannel getAsynchChannel() {
166 return asynchChannel;
167 }
168
169 /**
170 * @return the current version of this wire format
171 */
172 public int getCurrentWireFormatVersion() {
173 return getWireFormat().getCurrentWireFormatVersion();
174 }
175
176 }