001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.io.impl;
020 import java.io.DataInput;
021 import java.io.DataInputStream;
022 import java.io.DataOutput;
023 import java.io.DataOutputStream;
024 import java.io.IOException;
025 import java.io.ObjectStreamException;
026 import java.io.Serializable;
027 import java.util.Arrays;
028 import java.util.HashMap;
029 import java.util.Map;
030
031 import org.activemq.io.WireFormat;
032 import org.activemq.io.util.WireByteArrayInputStream;
033 import org.activemq.io.util.WireByteArrayOutputStream;
034 import org.activemq.message.CachedValue;
035 import org.activemq.message.Packet;
036
037 /**
038 * This is a stateful AbstractDefaultWireFormat which implements value caching. Not optimal for use by
039 * many concurrent threads. One DefaultWireFormat is typically allocated per client connection.
040 *
041 * @version $Revision: 1.1.1.1 $
042 */
043 public class DefaultWireFormat extends AbstractDefaultWireFormat implements Serializable {
044
045 private static final long serialVersionUID = -1454851936411678612L;
046
047 private static final int MAX_CACHE_SIZE = Short.MAX_VALUE/2; //needs to be a lot less than Short.MAX_VALUE
048
049 static final short NULL_VALUE = -1;
050 static final short CLEAR_CACHE = -2;
051
052 //
053 // Fields used during a write.
054 //
055 protected transient final Object writeMutex = new Object();
056 protected transient WireByteArrayOutputStream internalBytesOut;
057 protected transient DataOutputStream internalDataOut;
058 protected transient WireByteArrayOutputStream cachedBytesOut;
059 protected transient DataOutputStream cachedDataOut;
060 private Map writeValueCache = new HashMap();
061 protected transient short cachedKeyGenerator;
062 protected transient short lastWriteValueCacheEvictionPosition=500;
063
064 //
065 // Fields used during a read.
066 //
067 protected transient final Object readMutex = new Object();
068 protected transient WireByteArrayInputStream internalBytesIn;
069 protected transient DataInputStream internalDataIn;
070 private Object[] writeValueCacheArray = new Object[MAX_CACHE_SIZE];
071 private Object[] readValueCacheArray = new Object[MAX_CACHE_SIZE];
072
073
074 /**
075 * Default Constructor
076 */
077 public DefaultWireFormat() {
078 internalBytesOut = new WireByteArrayOutputStream();
079 internalDataOut = new DataOutputStream(internalBytesOut);
080 internalBytesIn = new WireByteArrayInputStream();
081 internalDataIn = new DataInputStream(internalBytesIn);
082 this.currentWireFormatVersion = WIRE_FORMAT_VERSION;
083 this.cachedBytesOut = new WireByteArrayOutputStream();
084 this.cachedDataOut = new DataOutputStream(cachedBytesOut);
085 }
086
087 /**
088 * @return new WireFormat
089 */
090 public WireFormat copy() {
091 DefaultWireFormat format = new DefaultWireFormat();
092 format.setCachingEnabled(cachingEnabled);
093 format.setCurrentWireFormatVersion(getCurrentWireFormatVersion());
094 return format;
095 }
096
097
098 private Object readResolve() throws ObjectStreamException {
099 return new DefaultWireFormat();
100 }
101
102 public Packet writePacket(Packet packet, DataOutput dataOut) throws IOException {
103 PacketWriter writer = getWriter(packet);
104 if (writer != null) {
105 synchronized (writeMutex) {
106 internalBytesOut.reset();
107 writer.writePacket(packet, internalDataOut);
108 internalDataOut.flush();
109 //reuse the byte buffer in the ByteArrayOutputStream
110 byte[] data = internalBytesOut.getData();
111 int count = internalBytesOut.size();
112 dataOut.writeByte(packet.getPacketType());
113 dataOut.writeInt(count);
114 //byte[] data = internalBytesOut.toByteArray();
115 //int count = data.length;
116 //dataOut.writeInt(count);
117 packet.setMemoryUsage(count);
118 dataOut.write(data, 0, count);
119 }
120 }
121 return null;
122 }
123
124 protected synchronized final Packet readPacket(DataInput dataIn, PacketReader reader) throws IOException {
125 synchronized (readMutex) {
126 Packet packet = reader.createPacket();
127 int length = dataIn.readInt();
128 packet.setMemoryUsage(length);
129 byte[] data = new byte[length];
130 dataIn.readFully(data);
131 //then splat into the internal datainput
132 internalBytesIn.restart(data);
133 reader.buildPacket(packet, internalDataIn);
134 return packet;
135 }
136 }
137
138 /**
139 * A helper method which converts a packet into a byte array Overrides the WireFormat to make use of the internal
140 * BytesOutputStream
141 *
142 * @param packet
143 * @return a byte array representing the packet using some wire protocol
144 * @throws IOException
145 */
146 public byte[] toBytes(Packet packet) throws IOException {
147 byte[] data = null;
148 PacketWriter writer = getWriter(packet);
149
150 if (writer != null) {
151
152 synchronized (writeMutex) {
153 internalBytesOut.reset();
154 internalDataOut.writeByte(packet.getPacketType());
155 internalDataOut.writeInt(-1);//the length
156 writer.writePacket(packet, internalDataOut);
157 internalDataOut.flush();
158 data = internalBytesOut.toByteArray();
159 }
160
161 // lets subtract the header offset from the length
162 int length = data.length - 5;
163 packet.setMemoryUsage(length);
164 //write in the length to the data
165 data[1] = (byte) ((length >>> 24) & 0xFF);
166 data[2] = (byte) ((length >>> 16) & 0xFF);
167 data[3] = (byte) ((length >>> 8) & 0xFF);
168 data[4] = (byte) ((length >>> 0) & 0xFF);
169 }
170
171 return data;
172 }
173
174 ///////////////////////////////////////////////////////////////
175 //
176 // Methods to handle cached values
177 //
178 ///////////////////////////////////////////////////////////////
179
180 public Object getValueFromReadCache(short key) {
181 if( key < 0 || key > readValueCacheArray.length )
182 return null;
183 return readValueCacheArray[key];
184 }
185
186 protected short getWriteCachedKey(Object key) throws IOException{
187 if (key != null){
188 Short result = null;
189 result = (Short)writeValueCache.get(key);
190 if (result == null){
191 result = getNextCacheId();
192 writeValueCache.put(key,result);
193 writeValueCacheArray[result.shortValue()]=key;
194 updateCachedValue(result.shortValue(),key);
195 }
196 return result.shortValue();
197 }
198 return DefaultWireFormat.NULL_VALUE;
199 }
200
201 /**
202 * @return
203 */
204 private Short getNextCacheId() {
205 Short result;
206 result = new Short(cachedKeyGenerator++);
207 // once we fill the cache start reusing old cache locations to avoid memory leaks.
208 if (cachedKeyGenerator >= MAX_CACHE_SIZE) {
209 cachedKeyGenerator=0;
210 }
211
212 lastWriteValueCacheEvictionPosition++;
213 if (lastWriteValueCacheEvictionPosition >= MAX_CACHE_SIZE) {
214 lastWriteValueCacheEvictionPosition=0;
215 }
216
217 if( writeValueCacheArray[lastWriteValueCacheEvictionPosition] !=null ) {
218 Object o = writeValueCacheArray[lastWriteValueCacheEvictionPosition];
219 writeValueCache.remove(o);
220 writeValueCacheArray[lastWriteValueCacheEvictionPosition]=null;
221 }
222 return result;
223 }
224
225 protected void validateWriteCache() throws IOException {
226 if (cachingEnabled) {
227 if (writeValueCache.size() >= MAX_CACHE_SIZE) {
228 writeValueCache.clear();
229 Arrays.fill(writeValueCacheArray,null);
230 cachedKeyGenerator = 0;
231 updateCachedValue((short) -1, null);// send update to peer to
232 // clear the peer cache
233 }
234 }
235 }
236
237 protected void handleCachedValue(CachedValue cv) {
238 if (cv != null) {
239 if (cv.getId() == CLEAR_CACHE) {
240 Arrays.fill(readValueCacheArray, null);
241 } else if (cv.getId() != NULL_VALUE) {
242 readValueCacheArray[cv.getId()] = cv.getValue();
243 }
244 }
245 }
246
247 private synchronized void updateCachedValue(short key, Object value) throws IOException {
248 if (cachedValueWriter == null) {
249 cachedValueWriter = new CachedValueWriter();
250 }
251 CachedValue cv = new CachedValue();
252 cv.setId(key);
253 cv.setValue(value);
254 cachedBytesOut.reset();
255 cachedValueWriter.writePacket(cv, cachedDataOut);
256 cachedDataOut.flush();
257 byte[] data = cachedBytesOut.getData();
258 int count = cachedBytesOut.size();
259 getTransportDataOut().writeByte(cv.getPacketType());
260 getTransportDataOut().writeInt(count);
261 getTransportDataOut().write(data, 0, count);
262 }
263 }