001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019
020 package org.activemq.io.impl;
021 import java.io.DataOutput;
022 import java.io.IOException;
023
024 import org.activemq.message.AbstractPacket;
025 import org.activemq.message.ActiveMQDestination;
026 import org.activemq.message.ActiveMQXid;
027 import org.activemq.message.MessageAck;
028 import org.activemq.message.Packet;
029 import org.activemq.util.BitArray;
030
031 /**
032 * Writes a ConsumerInfo object to a Stream
033 */
034
035 public class MessageAckWriter extends AbstractPacketWriter {
036 private AbstractDefaultWireFormat wireFormat;
037
038 MessageAckWriter(AbstractDefaultWireFormat wf){
039 this.wireFormat = wf;
040 }
041
042 MessageAckWriter(){
043 }
044
045 /**
046 * Return the type of Packet
047 *
048 * @return integer representation of the type of Packet
049 */
050
051 public int getPacketType() {
052 return Packet.ACTIVEMQ_MSG_ACK;
053 }
054
055 /**
056 * Write a Packet instance to data output stream
057 *
058 * @param packet the instance to be seralized
059 * @param dataOut the output stream
060 * @throws IOException thrown if an error occurs
061 */
062
063 public void writePacket(Packet packet, DataOutput dataOut) throws IOException {
064 MessageAck ack = (MessageAck) packet;
065
066 boolean cachingEnabled = wireFormat != null ? wireFormat.isCachingEnabled() : false;
067 boolean longSequence = ack.getSequenceNumber() > Integer.MAX_VALUE;
068
069
070 Object[] visited = ack.getBrokersVisited();
071 boolean writeVisited = visited != null && visited.length > 0;
072 BitArray ba = ack.getBitArray();
073 ba.reset();
074 ba.set(AbstractPacket.RECEIPT_REQUIRED_INDEX, ack.isReceiptRequired());
075 ba.set(AbstractPacket.BROKERS_VISITED_INDEX,writeVisited);
076 ba.set(MessageAck.MESSAGE_READ_INDEX, ack.isMessageRead());
077 ba.set(MessageAck.TRANSACTION_ID_INDEX, ack.isPartOfTransaction());
078 ba.set(MessageAck.XA_TRANS_INDEX, ack.isXaTransacted());
079 ba.set(MessageAck.PERSISTENT_INDEX,ack.isPersistent());
080 ba.set(MessageAck.EXPIRED_INDEX,ack.isExpired());
081 ba.set(MessageAck.EXTERNAL_MESSAGE_ID_INDEX, ack.isExternalMessageId());
082 ba.set(MessageAck.CACHED_VALUES_INDEX,cachingEnabled);
083 ba.set(MessageAck.LONG_SEQUENCE_INDEX, longSequence);
084 ba.writeToStream(dataOut);
085
086 if (ack.isReceiptRequired()){
087 dataOut.writeShort(ack.getId());
088 }
089 if (ack.isExternalMessageId()){
090 writeUTF(ack.getMessageID(),dataOut);
091 }else {
092 if (cachingEnabled){
093 dataOut.writeShort(wireFormat.getWriteCachedKey(ack.getProducerKey()));
094 }else{
095 writeUTF(ack.getProducerKey(),dataOut);
096 }
097 if (longSequence){
098 dataOut.writeLong(ack.getSequenceNumber());
099 }else {
100 dataOut.writeInt((int)ack.getSequenceNumber());
101 }
102 }
103 if (writeVisited){
104 dataOut.writeShort(visited.length);
105 for(int i =0; i < visited.length; i++){
106 final String brokerName = visited[i].toString();
107 if (brokerName != null) {
108 dataOut.writeUTF(brokerName);
109 }
110 }
111 }
112
113 if (ack.isPartOfTransaction()) {
114 if (cachingEnabled){
115 dataOut.writeShort(wireFormat.getWriteCachedKey(ack.getTransactionId()));
116 } else {
117 if( ack.isXaTransacted()) {
118 ActiveMQXid xid = (ActiveMQXid) ack.getTransactionId();
119 xid.write(dataOut);
120 } else {
121 super.writeUTF((String) ack.getTransactionId(), dataOut);
122 }
123 }
124 }
125
126 if (cachingEnabled){
127 dataOut.writeShort(wireFormat.getWriteCachedKey(ack.getConsumerId()));
128 dataOut.writeShort(wireFormat.getWriteCachedKey(ack.getDestination()));
129 }else {
130 super.writeUTF(ack.getConsumerId(), dataOut);
131 ActiveMQDestination.writeToStream((ActiveMQDestination) ack.getDestination(), dataOut);
132 }
133 }
134
135
136 }