001 /**
002 *
003 * Copyright 2004 Hiram Chirino
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.activemq.transport.activeio;
018
019 import java.io.IOException;
020
021 import org.activeio.Packet;
022 import org.activeio.PacketData;
023 import org.activeio.packet.AppendedPacket;
024 import org.activeio.packet.EOSPacket;
025
026 /**
027 * @version $Revision: 1.1.1.1 $
028 */
029 abstract public class PacketAggregator {
030
031 private static final int HEADER_LENGTH = 5;
032
033 Packet incompleteUpPacket;
034 boolean headerLoaded;
035 private int upPacketLength;
036
037 public void addRawPacket(Packet packet) throws IOException {
038
039 // Passthrough the EOS packet.
040 if( packet == EOSPacket.EOS_PACKET ) {
041 packetAssembled(packet);
042 return;
043 }
044
045 if (incompleteUpPacket != null) {
046 packet = AppendedPacket.join(incompleteUpPacket, packet);
047 incompleteUpPacket = null;
048 }
049
050 while (true) {
051
052 if (!headerLoaded) {
053 headerLoaded = packet.remaining() >= HEADER_LENGTH;
054 if( headerLoaded ) {
055 PacketData data = new PacketData(packet.duplicate());
056 data.readByte();
057 upPacketLength = data.readInt();
058 if( upPacketLength < 0 ) {
059 throw new IOException("Up packet lenth was invalid: "+upPacketLength);
060 }
061 upPacketLength+=HEADER_LENGTH;
062 }
063 if( !headerLoaded )
064 break;
065 }
066
067 if (packet.remaining() < upPacketLength )
068 break;
069
070 // Get ready to create a slice to send up.
071 int origLimit = packet.limit();
072 packet.limit(upPacketLength);
073 packetAssembled(packet.slice());
074
075 // Get a slice of the remaining since that will dump
076 // the first packets of an AppendedPacket
077 packet.position(upPacketLength);
078 packet.limit(origLimit);
079 packet = packet.slice();
080
081 // Need to load a header again now.
082 headerLoaded = false;
083 }
084 if (packet.hasRemaining()) {
085 incompleteUpPacket = packet;
086 }
087
088 }
089
090 protected abstract void packetAssembled(Packet packet);
091 }