001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 * Copyright 2004 Hiram Chirino
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 *
018 **/
019 package org.activemq.io.impl;
020
021 import java.io.ByteArrayOutputStream;
022 import java.io.DataInput;
023 import java.io.DataInputStream;
024 import java.io.DataOutput;
025 import java.io.DataOutputStream;
026 import java.io.IOException;
027 import java.io.ObjectStreamException;
028
029 import org.activeio.PacketData;
030 import org.activeio.adapter.PacketByteArrayOutputStream;
031 import org.activeio.adapter.PacketInputStream;
032 import org.activemq.io.WireFormat;
033 import org.activemq.message.CachedValue;
034 import org.activemq.message.Packet;
035
036 /**
037 * Provides a stateless implementation of AbstractDefaultWireFormat. Safe for use by multiple threads and incurs no locking overhead.
038 *
039 * @version $Revision: 1.1.1.1 $
040 */
041 public class StatelessDefaultWireFormat extends AbstractDefaultWireFormat {
042
043 private static final long serialVersionUID = -2648674156081593006L;
044
045 public Packet writePacket(Packet packet, DataOutput dataOut) throws IOException {
046
047 PacketWriter writer = getWriter(packet);
048 if (writer != null) {
049
050 PacketByteArrayOutputStream internalBytesOut = new PacketByteArrayOutputStream( 50+ (packet.getMemoryUsage()==0 ? 1024 : packet.getMemoryUsage()) );
051 DataOutputStream internalDataOut = new DataOutputStream(internalBytesOut);
052 writer.writePacket(packet, internalDataOut);
053 internalDataOut.close();
054
055 org.activeio.Packet p = internalBytesOut.getPacket();
056 int count = p.remaining();
057
058 dataOut.writeByte(packet.getPacketType());
059 dataOut.writeInt(count);
060 packet.setMemoryUsage(count);
061 p.writeTo(dataOut);
062
063 }
064 return null;
065 }
066
067 /**
068 * Write a Packet to a PacketByteArrayOutputStream
069 *
070 * @param packet
071 * @param dataOut
072 * @return a response packet - or null
073 * @throws IOException
074 */
075 public org.activeio.Packet writePacket(Packet packet, PacketByteArrayOutputStream paos) throws IOException {
076 PacketWriter writer = getWriter(packet);
077 if (writer != null) {
078
079 // We may not be writing to the start of the PAOS.
080 int startPosition = paos.position();
081 // Skip space for the headers.
082 paos.skip(5);
083 // Stream the data.
084 DataOutputStream data = new DataOutputStream(paos);
085 writer.writePacket(packet, data);
086 data.close();
087 org.activeio.Packet rc = paos.getPacket();
088
089 int count = rc.remaining()-(startPosition+5);
090 packet.setMemoryUsage(count);
091
092 // Now write the headers to the packet.
093
094 rc.position(startPosition);
095 PacketData pd = new PacketData(rc);
096 pd.writeByte(packet.getPacketType());
097 pd.writeInt(count);
098 rc.rewind();
099 return rc;
100 }
101 return null;
102 }
103
104 /**
105 * A helper method which converts a packet into a byte array Overrides the WireFormat to make use of the internal
106 * BytesOutputStream
107 *
108 * @param packet
109 * @return a byte array representing the packet using some wire protocol
110 * @throws IOException
111 */
112 public byte[] toBytes(Packet packet) throws IOException {
113
114 byte[] data = null;
115 PacketWriter writer = getWriter(packet);
116 if (writer != null) {
117
118 // Try to guess the right size.
119 ByteArrayOutputStream internalBytesOut = new ByteArrayOutputStream( 50+ (packet.getMemoryUsage()==0 ? 1024 : packet.getMemoryUsage()) );
120 DataOutputStream internalDataOut = new DataOutputStream(internalBytesOut);
121
122 internalBytesOut.reset();
123 internalDataOut.writeByte(packet.getPacketType());
124 internalDataOut.writeInt(-1);//the length
125 writer.writePacket(packet, internalDataOut);
126 internalDataOut.flush();
127 data = internalBytesOut.toByteArray();
128 // lets subtract the header offset from the length
129 int length = data.length - 5;
130 packet.setMemoryUsage(length);
131 //write in the length to the data
132 data[1] = (byte) ((length >>> 24) & 0xFF);
133 data[2] = (byte) ((length >>> 16) & 0xFF);
134 data[3] = (byte) ((length >>> 8) & 0xFF);
135 data[4] = (byte) ((length >>> 0) & 0xFF);
136 }
137 return data;
138 }
139
140 protected Packet readPacket(DataInput dataIn, PacketReader reader) throws IOException {
141 Packet packet = reader.createPacket();
142 int length = dataIn.readInt();
143 packet.setMemoryUsage(length);
144 reader.buildPacket(packet, dataIn);
145 return packet;
146 }
147
148 /**
149 * @param dataIn
150 * @return
151 * @throws IOException
152 */
153 public Packet readPacket(org.activeio.Packet dataIn) throws IOException {
154 return readPacket(new DataInputStream(new PacketInputStream(dataIn)));
155 }
156
157 protected void handleCachedValue(CachedValue cv) {
158 throw new IllegalStateException("Value caching is not supported.");
159 }
160
161 public Object getValueFromReadCache(short key) {
162 throw new IllegalStateException("Value caching is not supported.");
163 }
164
165 short getWriteCachedKey(Object value) {
166 throw new IllegalStateException("Value caching is not supported.");
167 }
168
169 public boolean isCachingEnabled() {
170 return false;
171 }
172
173 public WireFormat copy() {
174 return new StatelessDefaultWireFormat();
175 }
176
177 private Object readResolve() throws ObjectStreamException {
178 return new DefaultWireFormat();
179 }
180
181 }