001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.transport.tcp;
020
021 import java.io.EOFException;
022 import java.io.FilterOutputStream;
023 import java.io.IOException;
024 import java.io.OutputStream;
025
026 /**
027 * An optimized buffered outputstream for Tcp
028 *
029 * @version $Revision: 1.1.1.1 $
030 */
031
032 public class TcpBufferedOutputStream extends FilterOutputStream {
033 private final static int BUFFER_SIZE = 4096;
034 private byte[] buf;
035 private int count;
036 private boolean closed;
037
038 /**
039 * Constructor
040 *
041 * @param out
042 */
043 public TcpBufferedOutputStream(OutputStream out) {
044 this(out, BUFFER_SIZE);
045 }
046
047 /**
048 * Creates a new buffered output stream to write data to the specified underlying output stream with the specified
049 * buffer size.
050 *
051 * @param out the underlying output stream.
052 * @param size the buffer size.
053 * @throws IllegalArgumentException if size <= 0.
054 */
055 public TcpBufferedOutputStream(OutputStream out, int size) {
056 super(out);
057 if (size <= 0) {
058 throw new IllegalArgumentException("Buffer size <= 0");
059 }
060 buf = new byte[size];
061 }
062
063 /**
064 * write a byte on to the stream
065 *
066 * @param b - byte to write
067 * @throws IOException
068 */
069 public void write(int b) throws IOException {
070 checkClosed();
071 if (availableBufferToWrite() < 1) {
072 flush();
073 }
074 buf[count++] = (byte) b;
075 }
076
077
078 /**
079 * write a byte array to the stream
080 *
081 * @param b the byte buffer
082 * @param off the offset into the buffer
083 * @param len the length of data to write
084 * @throws IOException
085 */
086 public void write(byte b[], int off, int len) throws IOException {
087 checkClosed();
088 if (availableBufferToWrite() < len) {
089 flush();
090 }
091 if (buf.length >= len) {
092 System.arraycopy(b, off, buf, count, len);
093 count += len;
094 }
095 else {
096 out.write(b, off, len);
097 }
098 }
099
100 /**
101 * flush the data to the output stream
102 * This doesn't call flush on the underlying outputstream, because
103 * Tcp is particularly efficent at doing this itself ....
104 *
105 * @throws IOException
106 */
107 public void flush() throws IOException {
108 if (count > 0 && out != null) {
109 out.write(buf, 0, count);
110 count = 0;
111 }
112 }
113
114 /**
115 * close this stream
116 *
117 * @throws IOException
118 */
119 public void close() throws IOException {
120 super.close();
121 closed = true;
122 }
123
124
125 /**
126 * Checks that the stream has not been closed
127 *
128 * @throws IOException
129 */
130 protected void checkClosed() throws IOException {
131 if (closed) {
132 throw new EOFException("Cannot write to the stream any more it has already been closed");
133 }
134 }
135
136 /**
137 * @return the amount free space in the buffer
138 */
139 private int availableBufferToWrite() {
140 return buf.length - count;
141 }
142 }