001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.transport.jabber;
019
020 import org.activemq.io.AbstractWireFormat;
021 import org.activemq.io.WireFormat;
022 import org.activemq.io.util.ByteArray;
023 import org.activemq.message.ActiveMQBytesMessage;
024 import org.activemq.message.ActiveMQDestination;
025 import org.activemq.message.ActiveMQMessage;
026 import org.activemq.message.ActiveMQObjectMessage;
027 import org.activemq.message.ActiveMQTextMessage;
028 import org.activemq.message.ConnectionInfo;
029 import org.activemq.message.ConsumerInfo;
030 import org.activemq.message.Packet;
031 import org.activemq.util.IdGenerator;
032 import org.apache.commons.logging.Log;
033 import org.apache.commons.logging.LogFactory;
034
035 import javax.jms.Destination;
036 import javax.jms.JMSException;
037 import javax.xml.namespace.QName;
038 import javax.xml.stream.XMLStreamConstants;
039 import javax.xml.stream.XMLStreamException;
040 import javax.xml.stream.XMLStreamReader;
041 import java.io.DataInput;
042 import java.io.DataOutput;
043 import java.io.IOException;
044 import java.io.PrintWriter;
045 import java.io.Serializable;
046 import java.net.InetAddress;
047 import java.util.Iterator;
048 import java.util.List;
049 import java.util.Map;
050
051 /**
052 * A wire format which uses XMPP format of messages
053 *
054 * @version $Revision: 1.1 $
055 */
056 public class JabberWireFormat extends AbstractWireFormat {
057 private static final Log log = LogFactory.getLog(JabberWireFormat.class);
058
059 private static final String NAMESPACE = "http://etherx.jabber.org/streams";
060 private static final String QUEUE_PREFIX = "queue:";
061 private static final String TOPIC_PREFIX = "topic:";
062 private static final String TEMP_QUEUE_PREFIX = "tempQueue:";
063 private static final String TEMP_TOPIC_PREFIX = "tempTopic:";
064
065 private static final QName STREAM_QNAME = new QName(NAMESPACE, "stream", "stream");
066 private static final QName MESSAGE_QNAME = new QName("jabber:client","message","message");
067 private static final QName AUTH_QNAME = new QName("jabber:iq:auth", "query", "query");
068
069 private IdGenerator idGenerator = new IdGenerator();
070 private String clientID = idGenerator.generateId();
071 private ConnectionInfo connectionInfo;
072 private PrintWriter writer;
073 private String userName;
074 private boolean validStream = false;
075
076
077 public WireFormat copy() {
078 return new JabberWireFormat();
079 }
080
081 public Packet readPacket(int firstByte, DataInput in) throws IOException {
082 return null; /** TODO */
083 }
084
085
086 /**
087 * Reads a packet from the XML stream
088 * @param reader
089 * @param returnPackets
090 * @throws XMLStreamException
091 * @throws JMSException
092 */
093 public void readPacket(XMLStreamReader reader, List returnPackets) throws XMLStreamException, JMSException {
094 String sessionId = getAttributeValue("id", reader);
095
096 if (reader.next() == XMLStreamConstants.START_ELEMENT) {
097
098 QName name = reader.getName();
099
100 if (!validStream) {
101 if (name.equals(STREAM_QNAME)) {
102 validStream = true;
103 }
104 else {
105 String errStr = "Bad initial QName for stream. Received: " + name + " while expecting: "
106 + STREAM_QNAME;
107 log.warn(errStr);
108 throw new JMSException(errStr);
109 }
110 }
111 else {
112 QName test = new QName("jabber:iq:auth", "query", "query");
113
114 if (name.equals(AUTH_QNAME)) {
115 if (reader.hasNext() && reader.next()==XMLStreamConstants.START_ELEMENT){
116 name = reader.getName();
117 this.userName = reader.getElementText();
118
119 //skip past the end
120 if (reader.hasNext()) {
121 reader.next();
122 }
123 if (reader.hasNext() && reader.next()==XMLStreamConstants.START_ELEMENT){
124 if (sessionId != null){
125 writer.println(" <iq id='" + sessionId +"' type='result'/>");
126 writer.flush();
127 }
128 }else {
129 //write back a request for the password
130 writer.println("<iq id='" + sessionId + "'");
131 writer.println(" type = 'result'>");
132 writer.println("<query xmlns='jabber:iq:auth'><username>" + this.userName + "</username><password/><digest/><resource/></query></iq>");
133 writer.flush();
134 returnPackets.add(createConnectionInfo());
135 returnPackets.add(createConsumerPacket());
136 }
137 }
138 }else if (name.equals(MESSAGE_QNAME)){
139 Packet pack = readMessage(reader);
140 if (pack != null){
141 returnPackets.add(pack);
142 }
143 }else {
144 //general catch all - just say ok ..
145 if (sessionId != null){
146 writer.println(" <iq id='" + sessionId +"' type='result'/>");
147 writer.flush();
148 }
149 }
150 }
151 }
152 }
153
154 private String getAttributeValue(String attributeName, XMLStreamReader reader) {
155 String result = null;
156 for (int i = 0;i < reader.getAttributeCount();i++) {
157 if (reader.getAttributeName(i).toString().equals(attributeName)) {
158 result = reader.getAttributeValue(i);
159 break;
160 }
161 }
162 return result;
163 }
164
165
166
167
168 public Packet writePacket(Packet packet, DataOutput out) throws IOException, JMSException {
169 switch (packet.getPacketType()) {
170 case Packet.ACTIVEMQ_MESSAGE:
171 writeMessage((ActiveMQMessage) packet, "", out);
172 break;
173
174 case Packet.ACTIVEMQ_TEXT_MESSAGE:
175 writeTextMessage((ActiveMQTextMessage) packet, out);
176 break;
177
178 case Packet.ACTIVEMQ_BYTES_MESSAGE:
179 writeBytesMessage((ActiveMQBytesMessage) packet, out);
180 break;
181
182 case Packet.ACTIVEMQ_OBJECT_MESSAGE:
183 writeObjectMessage((ActiveMQObjectMessage) packet, out);
184 break;
185
186 case Packet.ACTIVEMQ_CONNECTION_INFO:
187
188 case Packet.ACTIVEMQ_MAP_MESSAGE:
189 case Packet.ACTIVEMQ_STREAM_MESSAGE:
190
191
192 case Packet.ACTIVEMQ_BROKER_INFO:
193 case Packet.ACTIVEMQ_MSG_ACK:
194 case Packet.CONSUMER_INFO:
195 case Packet.DURABLE_UNSUBSCRIBE:
196 case Packet.INT_RESPONSE_RECEIPT_INFO:
197 case Packet.PRODUCER_INFO:
198 case Packet.RECEIPT_INFO:
199 case Packet.RESPONSE_RECEIPT_INFO:
200 case Packet.SESSION_INFO:
201 case Packet.TRANSACTION_INFO:
202 case Packet.XA_TRANSACTION_INFO:
203 default:
204 log.debug("Ignoring message type: " + packet.getPacketType() + " packet: " + packet);
205 }
206 writer.flush();
207 return null;
208 }
209
210 /**
211 * Can this wireformat process packets of this version
212 *
213 * @param version the version number to test
214 * @return true if can accept the version
215 */
216 public boolean canProcessWireFormatVersion(int version) {
217 return true;
218 }
219
220 /**
221 * @return the current version of this wire format
222 */
223 public int getCurrentWireFormatVersion() {
224 return 1;
225 }
226
227 public PrintWriter getWriter() {
228 return writer;
229 }
230
231 public void setWriter(PrintWriter writer) {
232 this.writer = writer;
233 }
234
235 // Implementation methods
236 //-------------------------------------------------------------------------
237 protected Packet createConnectionInfo() {
238 connectionInfo = new ConnectionInfo();
239 connectionInfo.setStarted(true);
240 connectionInfo.setClientId(this.clientID);
241 connectionInfo.setClientVersion("" + getCurrentWireFormatVersion());
242 connectionInfo.setUserName(userName);
243 return connectionInfo;
244 }
245
246 protected Packet createConsumerPacket(){
247 ConsumerInfo info = new ConsumerInfo();
248 info.setClientId(this.clientID);
249 info.setConsumerNo(0);
250 info.setStarted(true);
251 info.setStartTime(System.currentTimeMillis());
252 info.setDestination(createDestination("chat",this.userName));
253 return info;
254 }
255
256
257
258
259 protected void initialize() throws IOException {
260 //start the stream -
261 String hostName = InetAddress.getLocalHost().toString();
262 writer.println("<?xml version='1.0'?>");
263 writer.println("<stream:stream");
264 writer.println(" xmlns='jabber:client'");
265 writer.println(" xml:lang='en'");
266 writer.println(" xmlns:stream='http://etherx.jabber.org/streams'");
267 writer.println(" from='" + hostName + "'");
268 writer.println(" id='" + clientID + "'>");
269 writer.flush();
270 }
271
272 protected Packet readMessage(XMLStreamReader reader) throws XMLStreamException, JMSException {
273 ActiveMQTextMessage message = new ActiveMQTextMessage();
274 message.setJMSMessageID(idGenerator.generateId());
275 QName name = reader.getName();
276 String to = getAttributeValue("to", reader);
277 String type = getAttributeValue("type",reader);
278
279 if (type != null){
280 message.setJMSType(type);
281 }
282
283 if (to != null && to.length() > 0) {
284 message.setJMSDestination(createDestination(type,to));
285 }
286
287 if (this.userName != null && this.userName .length() > 0) {
288 message.setJMSReplyTo(createDestination("chat",this.userName));
289 }
290
291 while (reader.hasNext()) {
292 switch (reader.nextTag()) {
293 case XMLStreamConstants.START_ELEMENT:
294 if (!readElement(reader, message)) {
295 log.debug("Unknown element: " + reader.getName());
296 }
297 break;
298 case XMLStreamConstants.END_ELEMENT:
299 case XMLStreamConstants.END_DOCUMENT:
300 return message;
301 }
302 }
303 return message;
304 }
305
306 protected boolean readElement(XMLStreamReader reader, ActiveMQTextMessage message) throws JMSException, XMLStreamException {
307 QName name = reader.getName();
308 String localPart = name.getLocalPart();
309 if (localPart.equals("body")) {
310 message.setText(reader.getElementText());
311 return true;
312 }
313 else if (localPart.equals("thread")) {
314 message.setJMSCorrelationID(reader.getElementText());
315 return true;
316 }
317 else {
318 return false;
319 }
320 }
321
322 protected String readXMLAsText(XMLStreamReader reader) throws XMLStreamException {
323 StringBuffer buffer = new StringBuffer();
324 int elementCount = 0;
325 while (reader.hasNext()) {
326 switch (reader.nextTag()) {
327 case XMLStreamConstants.START_ELEMENT:
328 if (elementCount++ > 0) {
329 writeStartElement(reader);
330 }
331 break;
332
333 case XMLStreamConstants.CHARACTERS:
334 buffer.append(reader.getText());
335 break;
336
337 case XMLStreamConstants.END_ELEMENT:
338 if (--elementCount <= 0) {
339 return buffer.toString();
340 }
341 writeEndElement(reader);
342 break;
343
344 case XMLStreamConstants.END_DOCUMENT:
345 return buffer.toString();
346 }
347 }
348 return buffer.toString();
349 }
350
351 protected void writeStartElement(XMLStreamReader reader) {
352 writer.print("<");
353 writeQName(reader.getName());
354 for (int i = 0, size = reader.getNamespaceCount(); i < size; i++) {
355 writer.print("xmlns");
356 String prefix = reader.getNamespacePrefix(i);
357 if (prefix != null && prefix.length() > 0) {
358 writer.print(":");
359 writer.print(prefix);
360 }
361 writer.print("='");
362 writer.print(reader.getNamespaceURI(i));
363 writer.print("'");
364 }
365 for (int i = 0, size = reader.getAttributeCount(); i < size; i++) {
366 writer.print("xmlns");
367 writeQName(reader.getAttributeName(i));
368 writer.print("='");
369 writer.print(reader.getAttributeValue(i));
370 writer.print("'");
371 }
372 writer.println(">");
373 }
374
375 protected void writeEndElement(XMLStreamReader reader) {
376 writer.print("</");
377 writeQName(reader.getName());
378 writer.println(">");
379 }
380
381 protected void writeQName(QName name) {
382 String prefix = name.getPrefix();
383 if (prefix != null && prefix.length() > 0) {
384 writer.print(prefix);
385 writer.print(":");
386 }
387 writer.print(name.getLocalPart());
388 }
389
390 protected ActiveMQDestination createDestination(String typeName,String text) {
391 int type = ActiveMQDestination.ACTIVEMQ_QUEUE;
392 if (text.startsWith(TOPIC_PREFIX)) {
393 type = ActiveMQDestination.ACTIVEMQ_TOPIC;
394 text = text.substring(TOPIC_PREFIX.length());
395 }
396 else if (text.startsWith(QUEUE_PREFIX)) {
397 type = ActiveMQDestination.ACTIVEMQ_QUEUE;
398 text = text.substring(QUEUE_PREFIX.length());
399 }
400 else if (text.startsWith(TEMP_QUEUE_PREFIX)) {
401 type = ActiveMQDestination.ACTIVEMQ_TEMPORARY_QUEUE;
402 text = text.substring(TEMP_QUEUE_PREFIX.length());
403 }
404 else if (text.startsWith(TEMP_TOPIC_PREFIX)) {
405 type = ActiveMQDestination.ACTIVEMQ_TEMPORARY_TOPIC;
406 text = text.substring(TEMP_TOPIC_PREFIX.length());
407 }else {
408 if (typeName != null){
409 if (typeName.equals("groupchat")){
410 type = ActiveMQDestination.ACTIVEMQ_TOPIC;
411 }
412 //else default is a queue - (assume default typeName is 'chat')
413 }
414 }
415 text = text.trim();
416 if (text.length() == 0) {
417 return null;
418 }
419 return ActiveMQDestination.createDestination(type, text);
420 }
421
422 protected String toString(Destination destination) {
423 if (destination instanceof ActiveMQDestination) {
424 ActiveMQDestination activeDestination = (ActiveMQDestination) destination;
425 String physicalName = activeDestination.getPhysicalName();
426 switch (activeDestination.getDestinationType()) {
427 case ActiveMQDestination.ACTIVEMQ_QUEUE:
428 return QUEUE_PREFIX + physicalName;
429
430 case ActiveMQDestination.ACTIVEMQ_TEMPORARY_QUEUE:
431 return TEMP_QUEUE_PREFIX + physicalName;
432
433 case ActiveMQDestination.ACTIVEMQ_TEMPORARY_TOPIC:
434 return TEMP_TOPIC_PREFIX + physicalName;
435 }
436 return physicalName;
437 }
438 return destination != null ? destination.toString() : "";
439 }
440
441 protected void writeObjectMessage(ActiveMQObjectMessage message, DataOutput out) throws JMSException, IOException {
442 Serializable object = message.getObject();
443 String text = (object != null) ? object.toString() : "";
444 writeMessage(message, text, out);
445 }
446
447 protected void writeTextMessage(ActiveMQTextMessage message, DataOutput out) throws JMSException, IOException {
448 writeMessage(message, message.getText(), out);
449 }
450
451 protected void writeBytesMessage(ActiveMQBytesMessage message, DataOutput out) throws IOException {
452 ByteArray data = message.getBodyAsBytes();
453 String text = encodeBinary(data.getBuf(), data.getOffset(), data.getLength());
454 writeMessage(message, text, out);
455 }
456
457 protected void writeMessage(ActiveMQMessage message, String body, DataOutput out) throws IOException {
458 String type = getXmppType(message);
459
460 writer.print("<");
461 writer.print(type);
462 writer.print(" to='");
463 writer.print(toString(message.getJMSDestination()));
464 writer.print("' from='");
465 writer.print(toString(message.getJMSReplyTo()));
466 String messageID = message.getJMSMessageID();
467 if (messageID != null) {
468 writer.print("' id='");
469 writer.print(messageID);
470 }
471
472 Map properties = message.getProperties();
473 if (properties != null) {
474 for (Iterator iter = properties.entrySet().iterator(); iter.hasNext();) {
475 Map.Entry entry = (Map.Entry) iter.next();
476 Object key = entry.getKey();
477 Object value = entry.getValue();
478 if (value != null) {
479 writer.print("' ");
480 writer.print(key.toString());
481 writer.print("='");
482 writer.print(value.toString());
483 }
484 }
485 }
486
487 writer.println("'>");
488
489 String id = message.getJMSCorrelationID();
490 if (id != null) {
491 writer.print("<thread>");
492 writer.print(id);
493 writer.print("</thread>");
494 }
495 writer.print("<body>");
496 writer.print(body);
497 writer.println("</body>");
498 writer.print("</");
499 writer.print(type);
500 writer.println(">");
501 }
502
503 protected String encodeBinary(byte[] data, int offset, int length) {
504 // TODO
505 throw new RuntimeException("Not implemented yet!");
506 }
507
508 protected String getXmppType(ActiveMQMessage message) {
509 String type = message.getJMSType();
510 if (type == null) {
511 type = "message";
512 }
513 return type;
514 }
515 }