001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.message;
020
021 import java.util.Iterator;
022 import java.util.List;
023 import java.util.ArrayList;
024 import java.util.StringTokenizer;
025
026 import org.activemq.util.BitArray;
027 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet;
028
029 /**
030 * Abstract class for a transportable Packet
031 *
032 * @version $Revision: 1.1.1.1 $
033 */
034 public abstract class AbstractPacket implements Packet {
035
036 /**
037 * Message flag indexes (used for writing/reading to/from a Stream
038 */
039 public static final int RECEIPT_REQUIRED_INDEX = 0;
040 public static final int BROKERS_VISITED_INDEX =1;
041 private short id = 0;
042 protected BitArray bitArray;
043 protected transient int cachedHashCode = -1;
044 private boolean receiptRequired;
045 private transient int memoryUsage = 2048;
046 private transient int memoryUsageReferenceCount;
047
048 private CopyOnWriteArraySet brokersVisited;
049
050 protected AbstractPacket(){
051 this.bitArray = new BitArray();
052 }
053
054 /**
055 * @return the unique id for this Packet
056 */
057 public short getId() {
058 return this.id;
059 }
060
061 /**
062 * Set the unique id for this Packet
063 *
064 * @param newId
065 */
066 public void setId(short newId) {
067 this.id = newId;
068 }
069
070 /**
071 * @return true if a Recipt is required
072 */
073 public boolean isReceiptRequired() {
074 return this.receiptRequired;
075 }
076
077 /**
078 * @return false since most packets are not receipt packets
079 */
080 public boolean isReceipt() {
081 return false;
082 }
083
084 /**
085 * Set if a Recipt if required on receiving this Packet
086 *
087 * @param value
088 */
089 public void setReceiptRequired(boolean value) {
090 this.receiptRequired = value;
091 }
092
093 /**
094 * Retrieve if a JMS Message type or not
095 *
096 * @return true if it is a JMS Message
097 */
098 public boolean isJMSMessage() {
099 return false;
100 }
101
102 /**
103 * Tests equality with another instance
104 *
105 * @param obj - the other instance to test equality with
106 * @return Returns true if the objects are equilvant
107 */
108 public boolean equals(Object obj) {
109 boolean result = this == obj;
110 if (!result && obj != null && obj instanceof AbstractPacket) {
111 AbstractPacket other = (AbstractPacket) obj;
112 result = other.id == this.id;
113 }
114 return result;
115 }
116
117 /**
118 * @return Returns hash code for this instance
119 */
120 public int hashCode() {
121 return this.id;
122 }
123
124 /**
125 * Get a hint about how much memory this Packet is consuming
126 *
127 * @return an aproximation of the current memory used by this instance
128 */
129 public int getMemoryUsage() {
130 return memoryUsage;
131 }
132
133 /**
134 * Set a hint about how mujch memory this packet is consuming
135 *
136 * @param newMemoryUsage
137 */
138 public void setMemoryUsage(int newMemoryUsage) {
139 this.memoryUsage = newMemoryUsage;
140 }
141
142 /**
143 * Increment reference count for bounded memory collections
144 *
145 * @return the incremented reference value
146 * @see org.activemq.io.util.MemoryBoundedQueue
147 */
148 public synchronized int incrementMemoryReferenceCount() {
149 return ++memoryUsageReferenceCount;
150 }
151
152 /**
153 * Decrement reference count for bounded memory collections
154 *
155 * @return the decremented reference value
156 * @see org.activemq.io.util.MemoryBoundedQueue
157 */
158 public synchronized int decrementMemoryReferenceCount() {
159 return --memoryUsageReferenceCount;
160 }
161
162 /**
163 * @return the current reference count for bounded memory collections
164 * @see org.activemq.io.util.MemoryBoundedQueue
165 */
166 public synchronized int getMemoryUsageReferenceCount() {
167 return memoryUsageReferenceCount;
168 }
169
170 /**
171 * As the packet passes through the broker add the broker to the visited list
172 *
173 * @param brokerName the name of the broker
174 */
175 public void addBrokerVisited(String brokerName) {
176 if (brokerName == null || brokerName.trim().equals("")) {
177 throw new IllegalArgumentException("Broker name cannot be empty or null");
178 }
179 initializeBrokersVisited();
180 brokersVisited.add(brokerName);
181 }
182
183 /**
184 * clear list of brokers visited
185 */
186 public void clearBrokersVisited(){
187 brokersVisited = null;
188 }
189
190 /**
191 * test to see if the named broker has already seen this packet
192 *
193 * @param brokerName the name of the broker
194 * @return true if the packet has visited the broker
195 */
196 public boolean hasVisited(String brokerName) {
197 if (brokersVisited == null){
198 return false;
199 }
200 return brokersVisited.contains(brokerName);
201 }
202
203 /**
204 * @return Returns the brokersVisited.
205 */
206 public String getBrokersVisitedAsString() {
207 String result = "";
208 if (brokersVisited != null && !brokersVisited.isEmpty()){
209 for (Iterator i = brokersVisited.iterator(); i.hasNext();){
210 result += i.next().toString() + ",";
211 }
212 }
213 return result;
214 }
215
216 public void setBrokersVisitedAsString(String value) {
217 initializeBrokersVisited();
218 StringTokenizer enm = new StringTokenizer(value, ",");
219 while (enm.hasMoreElements()) {
220 brokersVisited.add(enm.nextToken());
221 }
222 }
223
224 /**
225 * @return pretty print of this Packet
226 */
227 public String toString() {
228 return getPacketTypeAsString(getPacketType()) + ": id = " + getId();
229 }
230
231
232 public static String getPacketTypeAsString(int type) {
233 String packetTypeStr = "";
234 switch (type) {
235 case ACTIVEMQ_MESSAGE:
236 packetTypeStr = "ACTIVEMQ_MESSAGE";
237 break;
238 case ACTIVEMQ_TEXT_MESSAGE:
239 packetTypeStr = "ACTIVEMQ_TEXT_MESSAGE";
240 break;
241 case ACTIVEMQ_OBJECT_MESSAGE:
242 packetTypeStr = "ACTIVEMQ_OBJECT_MESSAGE";
243 break;
244 case ACTIVEMQ_BYTES_MESSAGE:
245 packetTypeStr = "ACTIVEMQ_BYTES_MESSAGE";
246 break;
247 case ACTIVEMQ_STREAM_MESSAGE:
248 packetTypeStr = "ACTIVEMQ_STREAM_MESSAGE";
249 break;
250 case ACTIVEMQ_MAP_MESSAGE:
251 packetTypeStr = "ACTIVEMQ_MAP_MESSAGE";
252 break;
253 case ACTIVEMQ_MSG_ACK:
254 packetTypeStr = "ACTIVEMQ_MSG_ACK";
255 break;
256 case RECEIPT_INFO:
257 packetTypeStr = "RECEIPT_INFO";
258 break;
259 case CONSUMER_INFO:
260 packetTypeStr = "CONSUMER_INFO";
261 break;
262 case PRODUCER_INFO:
263 packetTypeStr = "PRODUCER_INFO";
264 break;
265 case TRANSACTION_INFO:
266 packetTypeStr = "TRANSACTION_INFO";
267 break;
268 case XA_TRANSACTION_INFO:
269 packetTypeStr = "XA_TRANSACTION_INFO";
270 break;
271 case ACTIVEMQ_BROKER_INFO:
272 packetTypeStr = "ACTIVEMQ_BROKER_INFO";
273 break;
274 case ACTIVEMQ_CONNECTION_INFO:
275 packetTypeStr = "ACTIVEMQ_CONNECTION_INFO";
276 break;
277 case SESSION_INFO:
278 packetTypeStr = "SESSION_INFO";
279 break;
280 case DURABLE_UNSUBSCRIBE:
281 packetTypeStr = "DURABLE_UNSUBSCRIBE";
282 break;
283 case RESPONSE_RECEIPT_INFO:
284 packetTypeStr = "RESPONSE_RECEIPT_INFO";
285 break;
286 case INT_RESPONSE_RECEIPT_INFO:
287 packetTypeStr = "INT_RESPONSE_RECEIPT_INFO";
288 break;
289 case CAPACITY_INFO:
290 packetTypeStr = "CAPACITY_INFO";
291 break;
292 case CAPACITY_INFO_REQUEST:
293 packetTypeStr = "CAPACITY_INFO_REQUEST";
294 break;
295 case WIRE_FORMAT_INFO:
296 packetTypeStr = "WIRE_FORMAT_INFO";
297 break;
298 case KEEP_ALIVE:
299 packetTypeStr = "KEEP_ALIVE";
300 break;
301 case CACHED_VALUE_COMMAND:
302 packetTypeStr = "CachedValue";
303 break;
304 default :
305 packetTypeStr = "UNKNOWN PACKET TYPE: " + type;
306 }
307 return packetTypeStr;
308 }
309
310 /**
311 * A helper method used when implementing equals() which returns true if the objects are identical or equal handling
312 * nulls properly
313 * @param left
314 * @param right
315 *
316 * @return true if the objects are the same or equal or both null
317 */
318 protected boolean equals(Object left, Object right) {
319 return left == right || (left != null && left.equals(right));
320 }
321
322 /**
323 * Initializes another message with current values from this instance
324 *
325 * @param other the other ActiveMQMessage to initialize
326 */
327 protected void initializeOther(AbstractPacket other) {
328 initializeBrokersVisited();
329 other.id = this.id;
330 other.receiptRequired = this.receiptRequired;
331 other.memoryUsage = this.memoryUsage;
332 CopyOnWriteArraySet set = this.brokersVisited;
333 if (set != null && !set.isEmpty()){
334 other.brokersVisited = new CopyOnWriteArraySet(set);
335 }
336 }
337
338 synchronized void initializeBrokersVisited(){
339 if (this.brokersVisited == null){
340 this.brokersVisited = new CopyOnWriteArraySet();
341 }
342 }
343
344 /**
345 * @return Returns the brokersVisited.
346 */
347 public Object[] getBrokersVisited() {
348 if (brokersVisited == null || brokersVisited.isEmpty()){
349 return null;
350 }
351 return brokersVisited.toArray();
352 }
353
354 /**
355 * @return Returns the bitArray.
356 */
357 public BitArray getBitArray() {
358 return bitArray;
359 }
360 /**
361 * @param bitArray The bitArray to set.
362 */
363 public void setBitArray(BitArray bitArray) {
364 this.bitArray = bitArray;
365 }
366 }