001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.streams;
020 import java.io.EOFException;
021 import java.io.IOException;
022 import java.io.InputStream;
023 import javax.jms.JMSException;
024 import javax.jms.MessageConsumer;
025
026 import org.activemq.io.util.ByteArray;
027 import org.activemq.message.ActiveMQMessage;
028
029 /**
030 * An inputStream that reads data from a MessageConsumer
031 *
032 * @version $Revision: 1.1.1.1 $
033 */
034 public class JMSInputStream extends InputStream {
035 private static final int ARRAY_SIZE = 10;
036 private boolean closed;
037 protected ByteArray[] arrays = new ByteArray[ARRAY_SIZE];
038 private int offset;
039 private int current = 0;
040 protected int clen = 0;
041 private int markArray = -1;
042 private int markOffset = -1;
043 private MessageConsumer consumer;
044
045 /**
046 * Construct an input stream to read from a JMS Consumer
047 *
048 * @param consumer
049 */
050 public JMSInputStream(MessageConsumer consumer) {
051 this.consumer = consumer;
052 }
053
054 /**
055 * Read the next byte from this stream.
056 *
057 * @return the next byte
058 * @throws IOException
059 */
060 public int read() throws IOException {
061 if (closed)
062 throw new EOFException("JMSInputStream is closed");
063 if (current == clen) {
064 fillBuffer(1);
065 }
066 int c = (arrays[current].get(offset) & 0xff);
067 offset++;
068 if (offset == arrays[current].getLength()) {
069 offset = 0;
070 releaseBuffer(current);
071 current++;
072 }
073 return c;
074 }
075
076 /**
077 * Read data from this input stream into the given byte array starting at offset 0 for b.length bytes. Returns the
078 * actual number of bytes read;
079 *
080 * @param b
081 * @return the number of bytes read
082 * @throws IOException
083 */
084 public int read(byte b[]) throws IOException {
085 return read(b, 0, b.length);
086 }
087
088 /**
089 * Read data from this input stream into the given byte array starting at offset 'off' for 'len' bytes. Returns the
090 * actual number of bytes read.
091 *
092 * @param b buffer to read data into
093 * @param off offset into b
094 * @param len the maximum length
095 * @return the number of bytes actually read
096 * @throws IOException
097 */
098 public int read(byte b[], int off, int len) throws IOException {
099 if (closed)
100 throw new EOFException("JMSInputStream is closed");
101 int n = off;
102 int total = 0;
103 int last = Math.min(off + len, b.length);
104 if (current == clen) {
105 fillBuffer(len);
106 }
107 while ((current < clen) && (n < last)) {
108 int num_left = arrays[current].getLength() - offset;
109 int tocopy = Math.min(num_left, last - n);
110 System.arraycopy(arrays[current].getBuf(), offset, b, n, tocopy);
111 total += tocopy;
112 n += tocopy;
113 offset += tocopy;
114 if (offset == arrays[current].getLength()) {
115 offset = 0;
116 releaseBuffer(current);
117 current++;
118 }
119 }
120 return total;
121 }
122
123 /**
124 * Skip n bytes in this stream; returns the number of bytes actually skipped (which may be less than the number
125 * requested).
126 *
127 * @param length the number of bytes to skip
128 * @return the number of bytes actually skipped
129 * @throws IOException
130 */
131 public long skip(long length) throws IOException {
132 if (closed)
133 throw new EOFException("JMSInputStream is closed!");
134 int requested = Math.min((int) length, Integer.MAX_VALUE);
135 int totalskipped = 0;
136 while ((current < clen) && (arrays[current] != null) && (requested > 0)) {
137 if (current == clen) {
138 break;
139 }
140 int num_left = arrays[current].getLength() - offset;
141 if (num_left < requested) {
142 requested -= num_left;
143 totalskipped += num_left;
144 releaseBuffer(current);
145 current++;
146 offset = 0;
147 }
148 else {
149 totalskipped += requested;
150 offset += requested;
151 requested = 0;
152 }
153 }
154 return totalskipped;
155 }
156
157 /**
158 * Return the number of bytes available for reading.
159 *
160 * @return the number of bytes available
161 * @throws IOException
162 */
163 public int available() throws IOException {
164 if (closed)
165 throw new EOFException("JMSInputStream is closed!");
166 fillBuffer(0);
167 if (current == clen)
168 return 0;
169 int num_left = arrays[current].getLength() - offset;
170 for (int i = current + 1;i < clen;i++) {
171 if (arrays[i] == null)
172 break;
173 num_left += arrays[i].getLength();
174 }
175 return num_left;
176 }
177
178 /**
179 * close the stream and the MessageConsumer
180 */
181 public void close() {
182 try {
183 consumer.close();
184 }
185 catch (JMSException jmsEx) {
186 }
187 }
188
189 /**
190 * @return true
191 */
192 public boolean markSupported() {
193 return true;
194 }
195
196 /**
197 * Returns the stream to the position of the previous mark().
198 *
199 * @throws IOException
200 */
201 public void reset() throws IOException {
202 if (markArray == -1)
203 throw new IOException("PooledArrayInputStream not marked!");
204 current = markArray;
205 offset = markOffset;
206 markArray = -1;
207 }
208
209 /**
210 * Set the stream's mark to the current position.
211 *
212 * @param readlimit
213 */
214 public void mark(int readlimit) {
215 markArray = current;
216 markOffset = offset;
217 }
218
219 /**
220 * release up to the current buffer to GC
221 *
222 * @param index
223 */
224 private void releaseBuffer(int index) {
225 if (markArray < 0 || index < markArray) {
226 for (int i = 0;i <= index;i++) {
227 arrays[index] = null;
228 }
229 }
230 }
231
232 /**
233 * fill the buffer
234 *
235 * @param requiredLength
236 * @throws IOException
237 */
238 private void fillBuffer(int requiredLength) throws IOException {
239 int len = 0;
240 try {
241 do {
242 if (!closed) {
243 ActiveMQMessage msg = null;
244 if (len == 0 && requiredLength > 0) {
245 msg = (ActiveMQMessage) consumer.receive(2000);
246 }
247 else {
248 msg = (ActiveMQMessage) consumer.receiveNoWait();
249 }
250 if (msg != null) {
251 ByteArray ba = msg.getBodyAsBytes();
252 if (ba != null) {
253 len += ba.getLength();
254 process(ba);
255 }
256 }
257 else if (closed) {
258 break;
259 }
260 }
261 }
262 while (len < requiredLength && !closed);
263 }
264 catch (JMSException jmsEx) {
265 throw new IOException(jmsEx.getMessage());
266 }
267 }
268
269 /**
270 * Add an array to this PooledArrayInputStream.
271 *
272 * @param ba
273 */
274 private void process(ByteArray ba) {
275 if (current == clen && (clen + 1) == arrays.length) {
276 offset = 0;
277 current = 0;
278 clen = 0;
279 if (arrays.length > ARRAY_SIZE && markArray == -1) {
280 arrays = new ByteArray[ARRAY_SIZE];
281 }
282 }
283 arrays[clen] = ba;
284 clen++;
285 if (clen == arrays.length) {
286 ByteArray[] old = arrays;
287 arrays = new ByteArray[old.length + ARRAY_SIZE];
288 System.arraycopy(old, 0, arrays, 0, old.length);
289 }
290 }
291 }