001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.transport.reliable;
020 import java.net.URI;
021 import java.util.List;
022
023 import javax.jms.ExceptionListener;
024 import javax.jms.JMSException;
025
026 import org.activemq.TimeoutExpiredException;
027 import org.activemq.UnsupportedWireFormatException;
028 import org.activemq.io.WireFormat;
029 import org.activemq.message.Packet;
030 import org.activemq.message.PacketListener;
031 import org.activemq.message.Receipt;
032 import org.activemq.message.ReceiptHolder;
033 import org.activemq.transport.TransportChannel;
034 import org.activemq.transport.TransportStatusEvent;
035 import org.activemq.transport.composite.CompositeTransportChannel;
036 import org.apache.commons.logging.Log;
037 import org.apache.commons.logging.LogFactory;
038
039 /**
040 * A Compsite implementation of a TransportChannel
041 *
042 * @version $Revision: 1.1.1.1 $
043 */
044 public class ReliableTransportChannel extends CompositeTransportChannel implements PacketListener, ExceptionListener {
045 private static final Log log = LogFactory.getLog(ReliableTransportChannel.class);
046 private Object lock = new Object();
047 private long keepAliveTimeout = 60000L;
048
049 /**
050 * Construct this transport
051 *
052 * @param wireFormat
053 */
054 public ReliableTransportChannel(WireFormat wireFormat) {
055 super(wireFormat);
056 setMaximumRetries(0);
057 setEstablishConnectionTimeout(0);
058 setFailureSleepTime(5000L);
059 }
060
061 /**
062 * Construct this transport
063 *
064 * @param wireFormat
065 * @param uris
066 */
067 public ReliableTransportChannel(WireFormat wireFormat, List uris) {
068 super(wireFormat, uris);
069 setMaximumRetries(0);
070 setEstablishConnectionTimeout(0);
071 setFailureSleepTime(5000L);
072 }
073
074 /**
075 * @return pretty print for this
076 */
077 public String toString() {
078 return "ReliableTransportChannel: " + (channel == null ? "No active channel" : channel.toString());
079 }
080
081 /**
082 * Sets the number of milliseconds this channel can be idle after a keep-alive packet
083 * has been sent without being disconnected.
084 *
085 * @param timeoutInterval the timeout interval
086 */
087 public void setKeepAliveTimeout(long timeoutInterval) {
088 this.keepAliveTimeout = timeoutInterval;
089 }
090
091 public long getKeepAliveTimeout() {
092 return keepAliveTimeout;
093 }
094
095 /**
096 * @param packet
097 * @param timeout
098 * @return receipt - or null
099 * @throws JMSException
100 */
101 public Receipt send(Packet packet, int timeout) throws JMSException {
102 do {
103 TransportChannel tc = getEstablishedChannel(timeout);
104 if (tc != null) {
105 try {
106 return tc.send(packet, timeout);
107 }
108 catch (TimeoutExpiredException e) {
109 throw e;
110 }
111 catch (UnsupportedWireFormatException uwf) {
112 throw uwf;
113 }
114 catch (JMSException jmsEx) {
115 if (isPendingStop()) {
116 break;
117 }
118 doReconnect(tc, timeout);
119 }
120 }
121 }
122 while (!closed.get() && !isPendingStop());
123 return null;
124 }
125
126 /**
127 * @param packet
128 * @throws JMSException
129 */
130 public void asyncSend(Packet packet) throws JMSException {
131 long timeout = getEstablishConnectionTimeout();
132 do {
133 TransportChannel tc = getEstablishedChannel(timeout);
134 if (tc != null) {
135 try {
136 tc.asyncSend(packet);
137 break;
138 }
139 catch (TimeoutExpiredException e) {
140 throw e;
141 }
142 catch (UnsupportedWireFormatException uwf) {
143 throw uwf;
144 }
145 catch (JMSException jmsEx) {
146 if (isPendingStop()) {
147 break;
148 }
149 doReconnect(tc, timeout);
150 }
151 }
152 }
153 while (!closed.get() && !isPendingStop());
154 }
155
156 public ReceiptHolder asyncSendWithReceipt(Packet packet) throws JMSException {
157 long timeout = getEstablishConnectionTimeout();
158 do {
159 TransportChannel tc = getEstablishedChannel(timeout);
160 if (tc != null) {
161 try {
162 return tc.asyncSendWithReceipt(packet);
163 }
164 catch (TimeoutExpiredException e) {
165 throw e;
166 }
167 catch (UnsupportedWireFormatException uwf) {
168 throw uwf;
169 }
170 catch (JMSException jmsEx) {
171 if (isPendingStop()) {
172 break;
173 }
174 doReconnect(tc, timeout);
175 }
176 }
177 }
178 while (!closed.get() && !isPendingStop());
179 return null;
180 }
181
182 protected void configureChannel() {
183 channel.setPacketListener(this);
184 channel.setExceptionListener(this);
185 channel.addTransportStatusEventListener(this);
186 }
187
188 protected URI extractURI(List list) throws JMSException {
189 int idx = 0;
190 if (list.size() > 1) {
191 SMLCGRandom rand = new SMLCGRandom();
192 do {
193 idx = (int) (rand.nextDouble() * list.size());
194 }
195 while (idx < 0 || idx >= list.size());
196 }
197 Object answer = list.remove(idx);
198 if (answer instanceof URI) {
199 return (URI) answer;
200 }
201 else {
202 log.error("#### got: " + answer + " of type: " + answer.getClass());
203 return null;
204 }
205 }
206
207 /**
208 * consume a packet from the enbedded channel
209 *
210 * @param packet to consume
211 */
212 public void consume(Packet packet) {
213 //do processing
214 //avoid a lock
215 PacketListener listener = getPacketListener();
216 if (listener != null) {
217 listener.consume(packet);
218 }
219 }
220
221 /**
222 * handle exception from the embedded channel
223 *
224 * @param jmsEx
225 */
226 public void onException(JMSException jmsEx) {
227 TransportChannel tc = this.channel;
228 if (jmsEx instanceof UnsupportedWireFormatException) {
229 fireException(jmsEx);
230 }
231 else {
232 try {
233 doReconnect(tc, getEstablishConnectionTimeout());
234 }
235 catch (JMSException ex) {
236 ex.setLinkedException(jmsEx);
237 fireException(ex);
238 }
239 }
240 }
241
242 /**
243 * stop this channel
244 */
245 public void stop() {
246 super.stop();
247 fireStatusEvent(super.currentURI, TransportStatusEvent.STOPPED);
248 }
249
250 /**
251 * Fire a JMSException to the exception listener
252 *
253 * @param jmsEx
254 */
255 protected void fireException(JMSException jmsEx) {
256 ExceptionListener listener = getExceptionListener();
257 if (listener != null) {
258 listener.onException(jmsEx);
259 }
260 }
261
262 protected TransportChannel getEstablishedChannel(long timeout) throws JMSException {
263 if (!closed.get() && this.channel == null && !isPendingStop()) {
264 establishConnection(timeout);
265 }
266 return this.channel;
267 }
268
269 protected void doReconnect(TransportChannel currentChannel, long timeout) throws JMSException {
270 setTransportConnected(false);
271 if (!closed.get() && !isPendingStop()) {
272 synchronized (lock) {
273 //Loss of connectivity can be signalled from more than one
274 //thread - hence the check here - we want to avoid doing it more than once
275 if (this.channel == currentChannel) {
276 fireStatusEvent(super.currentURI, TransportStatusEvent.DISCONNECTED);
277 try {
278 establishConnection(timeout);
279 }
280 catch (JMSException jmsEx) {
281 fireStatusEvent(super.currentURI, TransportStatusEvent.FAILED);
282 throw jmsEx;
283 }
284 setTransportConnected(true);
285 fireStatusEvent(super.currentURI, TransportStatusEvent.RECONNECTED);
286 }
287 }
288 }
289 }
290 }