001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.streams;
020 import java.io.EOFException;
021 import java.io.IOException;
022 import java.io.OutputStream;
023 import javax.jms.BytesMessage;
024 import javax.jms.JMSException;
025 import javax.jms.MessageProducer;
026 import org.activemq.message.ActiveMQBytesMessage;
027
028 /**
029 * OutputStream that writes on to JMS via the supplied JMS MessageProducer
030 *
031 * @version $Revision: 1.1.1.1 $
032 */
033 public class JMSOutputStream extends OutputStream {
034 private final static int BUFFER_SIZE = 16 * 1024;
035 private byte[] buf;
036 private int count;
037 private boolean closed;
038 private MessageProducer producer;
039
040
041 /**
042 * Creates a new output stream to write data using the supplied JMS MessageProducer
043 * @param producer
044 */
045 public JMSOutputStream(MessageProducer producer) {
046 this(producer,BUFFER_SIZE);
047 }
048
049 /**
050 * Creates a new output stream to write data using the supplied JMS MessageProducer
051 * @param producer
052 * @param size the buffer size.
053 * @throws IllegalArgumentException if size <= 0.
054 */
055 public JMSOutputStream(MessageProducer producer, int size) {
056 if (size <= 0) {
057 throw new IllegalArgumentException("Buffer size <= 0");
058 }
059 buf = new byte[size];
060 this.producer = producer;
061 }
062
063 /**
064 * write a byte on to the stream
065 *
066 * @param b - byte to write
067 * @throws IOException
068 */
069 public void write(int b) throws IOException {
070 checkClosed();
071 if (availableBufferToWrite() < 1) {
072 flush();
073 }
074 buf[count++] = (byte) b;
075 }
076
077
078 /**
079 * write a byte array to the stream
080 *
081 * @param b the byte buffer
082 * @param off the offset into the buffer
083 * @param len the length of data to write
084 * @throws IOException
085 */
086 public void write(byte b[], int off, int len) throws IOException {
087 checkClosed();
088 if (availableBufferToWrite() < len) {
089 flush();
090 }
091 if (buf.length >= len) {
092 System.arraycopy(b, off, buf, count, len);
093 count += len;
094 }
095 else {
096 writeBuffer(b, off, len);
097 }
098 }
099
100 /**
101 * flush the data to the output stream
102 * This doesn't call flush on the underlying outputstream, because
103 * Tcp is particularly efficent at doing this itself ....
104 *
105 * @throws IOException
106 */
107 public void flush() throws IOException {
108 checkClosed();
109 if (count > 0 ) {
110 writeBuffer(buf, 0, count);
111 count = 0;
112 }
113 }
114
115 /**
116 * close this stream
117 *
118 * @throws IOException
119 */
120 public void close() throws IOException {
121 if (!closed) {
122 write(-1);
123 flush();
124 super.close();
125 closed = true;
126 try {
127 producer.close();
128 }
129 catch (JMSException jmsEx) {
130 IOException ioEx = new IOException(jmsEx.getMessage());
131 throw ioEx;
132 }
133 }
134 }
135
136
137 /**
138 * Checks that the stream has not been closed
139 *
140 * @throws IOException
141 */
142 protected void checkClosed() throws IOException {
143 if (closed) {
144 throw new EOFException("Cannot write to the stream any more it has already been closed");
145 }
146 }
147
148 /**
149 * @return the amount free space in the buffer
150 */
151 private int availableBufferToWrite() {
152 return buf.length - count;
153 }
154
155 private void writeBuffer(byte[] buf,int offset, int length) throws IOException{
156 try {
157 BytesMessage message = new ActiveMQBytesMessage();
158 message.writeBytes(buf,offset,length);
159 producer.send(message);
160 }catch(JMSException jmsEx){
161 IOException ioEx = new IOException(jmsEx.getMessage());
162 throw ioEx;
163 }
164 }
165 }