001/** 002 * 003 * Copyright the original author or authors 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smackx.bytestreams.ibb; 018 019import java.io.IOException; 020import java.io.InputStream; 021import java.io.OutputStream; 022import java.net.SocketTimeoutException; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.LinkedBlockingQueue; 025import java.util.concurrent.TimeUnit; 026 027import org.jivesoftware.smack.SmackException.NotConnectedException; 028import org.jivesoftware.smack.StanzaListener; 029import org.jivesoftware.smack.XMPPConnection; 030import org.jivesoftware.smack.filter.AndFilter; 031import org.jivesoftware.smack.filter.StanzaFilter; 032import org.jivesoftware.smack.filter.StanzaTypeFilter; 033import org.jivesoftware.smack.packet.IQ; 034import org.jivesoftware.smack.packet.Message; 035import org.jivesoftware.smack.packet.Stanza; 036import org.jivesoftware.smack.packet.XMPPError; 037import org.jivesoftware.smack.util.stringencoder.Base64; 038 039import org.jivesoftware.smackx.bytestreams.BytestreamSession; 040import org.jivesoftware.smackx.bytestreams.ibb.packet.Close; 041import org.jivesoftware.smackx.bytestreams.ibb.packet.Data; 042import org.jivesoftware.smackx.bytestreams.ibb.packet.DataPacketExtension; 043import org.jivesoftware.smackx.bytestreams.ibb.packet.Open; 044 045import org.jxmpp.jid.Jid; 046 047/** 048 * InBandBytestreamSession class represents an In-Band Bytestream session. 049 * <p> 050 * In-band bytestreams are bidirectional and this session encapsulates the streams for both 051 * directions. 052 * <p> 053 * Note that closing the In-Band Bytestream session will close both streams. If both streams are 054 * closed individually the session will be closed automatically once the second stream is closed. 055 * Use the {@link #setCloseBothStreamsEnabled(boolean)} method if both streams should be closed 056 * automatically if one of them is closed. 057 * 058 * @author Henning Staib 059 */ 060public class InBandBytestreamSession implements BytestreamSession { 061 062 /* XMPP connection */ 063 private final XMPPConnection connection; 064 065 /* the In-Band Bytestream open request for this session */ 066 private final Open byteStreamRequest; 067 068 /* 069 * the input stream for this session (either IQIBBInputStream or MessageIBBInputStream) 070 */ 071 private IBBInputStream inputStream; 072 073 /* 074 * the output stream for this session (either IQIBBOutputStream or MessageIBBOutputStream) 075 */ 076 private IBBOutputStream outputStream; 077 078 /* JID of the remote peer */ 079 private Jid remoteJID; 080 081 /* flag to close both streams if one of them is closed */ 082 private boolean closeBothStreamsEnabled = false; 083 084 /* flag to indicate if session is closed */ 085 private boolean isClosed = false; 086 087 /** 088 * Constructor. 089 * 090 * @param connection the XMPP connection 091 * @param byteStreamRequest the In-Band Bytestream open request for this session 092 * @param remoteJID JID of the remote peer 093 */ 094 protected InBandBytestreamSession(XMPPConnection connection, Open byteStreamRequest, 095 Jid remoteJID) { 096 this.connection = connection; 097 this.byteStreamRequest = byteStreamRequest; 098 this.remoteJID = remoteJID; 099 100 // initialize streams dependent to the uses stanza type 101 switch (byteStreamRequest.getStanza()) { 102 case IQ: 103 this.inputStream = new IQIBBInputStream(); 104 this.outputStream = new IQIBBOutputStream(); 105 break; 106 case MESSAGE: 107 this.inputStream = new MessageIBBInputStream(); 108 this.outputStream = new MessageIBBOutputStream(); 109 break; 110 } 111 112 } 113 114 @Override 115 public InputStream getInputStream() { 116 return this.inputStream; 117 } 118 119 @Override 120 public OutputStream getOutputStream() { 121 return this.outputStream; 122 } 123 124 @Override 125 public int getReadTimeout() { 126 return this.inputStream.readTimeout; 127 } 128 129 @Override 130 public void setReadTimeout(int timeout) { 131 if (timeout < 0) { 132 throw new IllegalArgumentException("Timeout must be >= 0"); 133 } 134 this.inputStream.readTimeout = timeout; 135 } 136 137 /** 138 * Returns whether both streams should be closed automatically if one of the streams is closed. 139 * Default is <code>false</code>. 140 * 141 * @return <code>true</code> if both streams will be closed if one of the streams is closed, 142 * <code>false</code> if both streams can be closed independently. 143 */ 144 public boolean isCloseBothStreamsEnabled() { 145 return closeBothStreamsEnabled; 146 } 147 148 /** 149 * Sets whether both streams should be closed automatically if one of the streams is closed. 150 * Default is <code>false</code>. 151 * 152 * @param closeBothStreamsEnabled <code>true</code> if both streams should be closed if one of 153 * the streams is closed, <code>false</code> if both streams should be closed 154 * independently 155 */ 156 public void setCloseBothStreamsEnabled(boolean closeBothStreamsEnabled) { 157 this.closeBothStreamsEnabled = closeBothStreamsEnabled; 158 } 159 160 @Override 161 public void close() throws IOException { 162 closeByLocal(true); // close input stream 163 closeByLocal(false); // close output stream 164 } 165 166 /** 167 * This method is invoked if a request to close the In-Band Bytestream has been received. 168 * 169 * @param closeRequest the close request from the remote peer 170 * @throws NotConnectedException 171 * @throws InterruptedException 172 */ 173 protected void closeByPeer(Close closeRequest) throws NotConnectedException, InterruptedException { 174 175 /* 176 * close streams without flushing them, because stream is already considered closed on the 177 * remote peers side 178 */ 179 this.inputStream.closeInternal(); 180 this.inputStream.cleanup(); 181 this.outputStream.closeInternal(false); 182 183 // acknowledge close request 184 IQ confirmClose = IQ.createResultIQ(closeRequest); 185 this.connection.sendStanza(confirmClose); 186 187 } 188 189 /** 190 * This method is invoked if one of the streams has been closed locally, if an error occurred 191 * locally or if the whole session should be closed. 192 * 193 * @throws IOException if an error occurs while sending the close request 194 */ 195 protected synchronized void closeByLocal(boolean in) throws IOException { 196 if (this.isClosed) { 197 return; 198 } 199 200 if (this.closeBothStreamsEnabled) { 201 this.inputStream.closeInternal(); 202 this.outputStream.closeInternal(true); 203 } 204 else { 205 if (in) { 206 this.inputStream.closeInternal(); 207 } 208 else { 209 // close stream but try to send any data left 210 this.outputStream.closeInternal(true); 211 } 212 } 213 214 if (this.inputStream.isClosed && this.outputStream.isClosed) { 215 this.isClosed = true; 216 217 // send close request 218 Close close = new Close(this.byteStreamRequest.getSessionID()); 219 close.setTo(this.remoteJID); 220 try { 221 connection.createStanzaCollectorAndSend(close).nextResultOrThrow(); 222 } 223 catch (Exception e) { 224 // Sadly we are unable to use the IOException(Throwable) constructor because this 225 // constructor is only supported from Android API 9 on. 226 IOException ioException = new IOException(); 227 ioException.initCause(e); 228 throw ioException; 229 } 230 231 this.inputStream.cleanup(); 232 233 // remove session from manager 234 // Thanks Google Error Prone for finding the bug where remove() was called with 'this' as argument. Changed 235 // now to remove(byteStreamRequest.getSessionID). 236 InBandBytestreamManager.getByteStreamManager(this.connection).getSessions().remove(byteStreamRequest.getSessionID()); 237 } 238 239 } 240 241 /** 242 * IBBInputStream class is the base implementation of an In-Band Bytestream input stream. 243 * Subclasses of this input stream must provide a stanza(/packet) listener along with a stanza(/packet) filter to 244 * collect the In-Band Bytestream data packets. 245 */ 246 private abstract class IBBInputStream extends InputStream { 247 248 /* the data packet listener to fill the data queue */ 249 private final StanzaListener dataPacketListener; 250 251 /* queue containing received In-Band Bytestream data packets */ 252 protected final BlockingQueue<DataPacketExtension> dataQueue = new LinkedBlockingQueue<DataPacketExtension>(); 253 254 /* buffer containing the data from one data packet */ 255 private byte[] buffer; 256 257 /* pointer to the next byte to read from buffer */ 258 private int bufferPointer = -1; 259 260 /* data packet sequence (range from 0 to 65535) */ 261 private long seq = -1; 262 263 /* flag to indicate if input stream is closed */ 264 private boolean isClosed = false; 265 266 /* flag to indicate if close method was invoked */ 267 private boolean closeInvoked = false; 268 269 /* timeout for read operations */ 270 private int readTimeout = 0; 271 272 /** 273 * Constructor. 274 */ 275 public IBBInputStream() { 276 // add data packet listener to connection 277 this.dataPacketListener = getDataPacketListener(); 278 connection.addSyncStanzaListener(this.dataPacketListener, getDataPacketFilter()); 279 } 280 281 /** 282 * Returns the stanza(/packet) listener that processes In-Band Bytestream data packets. 283 * 284 * @return the data stanza(/packet) listener 285 */ 286 protected abstract StanzaListener getDataPacketListener(); 287 288 /** 289 * Returns the stanza(/packet) filter that accepts In-Band Bytestream data packets. 290 * 291 * @return the data stanza(/packet) filter 292 */ 293 protected abstract StanzaFilter getDataPacketFilter(); 294 295 @Override 296 public synchronized int read() throws IOException { 297 checkClosed(); 298 299 // if nothing read yet or whole buffer has been read fill buffer 300 if (bufferPointer == -1 || bufferPointer >= buffer.length) { 301 // if no data available and stream was closed return -1 302 if (!loadBuffer()) { 303 return -1; 304 } 305 } 306 307 // return byte and increment buffer pointer 308 return buffer[bufferPointer++] & 0xff; 309 } 310 311 @Override 312 public synchronized int read(byte[] b, int off, int len) throws IOException { 313 if (b == null) { 314 throw new NullPointerException(); 315 } 316 else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) 317 || ((off + len) < 0)) { 318 throw new IndexOutOfBoundsException(); 319 } 320 else if (len == 0) { 321 return 0; 322 } 323 324 checkClosed(); 325 326 // if nothing read yet or whole buffer has been read fill buffer 327 if (bufferPointer == -1 || bufferPointer >= buffer.length) { 328 // if no data available and stream was closed return -1 329 if (!loadBuffer()) { 330 return -1; 331 } 332 } 333 334 // if more bytes wanted than available return all available 335 int bytesAvailable = buffer.length - bufferPointer; 336 if (len > bytesAvailable) { 337 len = bytesAvailable; 338 } 339 340 System.arraycopy(buffer, bufferPointer, b, off, len); 341 bufferPointer += len; 342 return len; 343 } 344 345 @Override 346 public synchronized int read(byte[] b) throws IOException { 347 return read(b, 0, b.length); 348 } 349 350 /** 351 * This method blocks until a data stanza(/packet) is received, the stream is closed or the current 352 * thread is interrupted. 353 * 354 * @return <code>true</code> if data was received, otherwise <code>false</code> 355 * @throws IOException if data packets are out of sequence 356 */ 357 private synchronized boolean loadBuffer() throws IOException { 358 359 // wait until data is available or stream is closed 360 DataPacketExtension data = null; 361 try { 362 if (this.readTimeout == 0) { 363 while (data == null) { 364 if (isClosed && this.dataQueue.isEmpty()) { 365 return false; 366 } 367 data = this.dataQueue.poll(1000, TimeUnit.MILLISECONDS); 368 } 369 } 370 else { 371 data = this.dataQueue.poll(this.readTimeout, TimeUnit.MILLISECONDS); 372 if (data == null) { 373 throw new SocketTimeoutException(); 374 } 375 } 376 } 377 catch (InterruptedException e) { 378 // Restore the interrupted status 379 Thread.currentThread().interrupt(); 380 return false; 381 } 382 383 // handle sequence overflow 384 if (this.seq == 65535) { 385 this.seq = -1; 386 } 387 388 // check if data packets sequence is successor of last seen sequence 389 long seq = data.getSeq(); 390 if (seq - 1 != this.seq) { 391 // packets out of order; close stream/session 392 InBandBytestreamSession.this.close(); 393 throw new IOException("Packets out of sequence"); 394 } 395 else { 396 this.seq = seq; 397 } 398 399 // set buffer to decoded data 400 buffer = data.getDecodedData(); 401 bufferPointer = 0; 402 return true; 403 } 404 405 /** 406 * Checks if this stream is closed and throws an IOException if necessary 407 * 408 * @throws IOException if stream is closed and no data should be read anymore 409 */ 410 private void checkClosed() throws IOException { 411 // Throw an exception if, and only if, this stream has been already 412 // closed by the user using the close() method 413 if (closeInvoked) { 414 // clear data queue in case additional data was received after stream was closed 415 this.dataQueue.clear(); 416 throw new IOException("Stream is closed"); 417 } 418 } 419 420 @Override 421 public boolean markSupported() { 422 return false; 423 } 424 425 @Override 426 public void close() throws IOException { 427 if (closeInvoked) { 428 return; 429 } 430 431 this.closeInvoked = true; 432 433 InBandBytestreamSession.this.closeByLocal(true); 434 } 435 436 /** 437 * This method sets the close flag and removes the data stanza(/packet) listener. 438 */ 439 private void closeInternal() { 440 if (isClosed) { 441 return; 442 } 443 isClosed = true; 444 } 445 446 /** 447 * Invoked if the session is closed. 448 */ 449 private void cleanup() { 450 connection.removeSyncStanzaListener(this.dataPacketListener); 451 } 452 453 } 454 455 /** 456 * IQIBBInputStream class implements IBBInputStream to be used with IQ stanzas encapsulating the 457 * data packets. 458 */ 459 private class IQIBBInputStream extends IBBInputStream { 460 461 @Override 462 protected StanzaListener getDataPacketListener() { 463 return new StanzaListener() { 464 465 private long lastSequence = -1; 466 467 @Override 468 public void processStanza(Stanza packet) throws NotConnectedException, InterruptedException { 469 // get data packet extension 470 DataPacketExtension data = ((Data) packet).getDataPacketExtension(); 471 472 /* 473 * check if sequence was not used already (see XEP-0047 Section 2.2) 474 */ 475 if (data.getSeq() <= this.lastSequence) { 476 IQ unexpectedRequest = IQ.createErrorResponse((IQ) packet, 477 XMPPError.Condition.unexpected_request); 478 connection.sendStanza(unexpectedRequest); 479 return; 480 481 } 482 483 // check if encoded data is valid (see XEP-0047 Section 2.2) 484 if (data.getDecodedData() == null) { 485 // data is invalid; respond with bad-request error 486 IQ badRequest = IQ.createErrorResponse((IQ) packet, 487 XMPPError.Condition.bad_request); 488 connection.sendStanza(badRequest); 489 return; 490 } 491 492 // data is valid; add to data queue 493 dataQueue.offer(data); 494 495 // confirm IQ 496 IQ confirmData = IQ.createResultIQ((IQ) packet); 497 connection.sendStanza(confirmData); 498 499 // set last seen sequence 500 this.lastSequence = data.getSeq(); 501 if (this.lastSequence == 65535) { 502 this.lastSequence = -1; 503 } 504 505 } 506 507 }; 508 } 509 510 @Override 511 protected StanzaFilter getDataPacketFilter() { 512 /* 513 * filter all IQ stanzas having type 'SET' (represented by Data class), containing a 514 * data stanza(/packet) extension, matching session ID and recipient 515 */ 516 return new AndFilter(new StanzaTypeFilter(Data.class), new IBBDataPacketFilter()); 517 } 518 519 } 520 521 /** 522 * MessageIBBInputStream class implements IBBInputStream to be used with message stanzas 523 * encapsulating the data packets. 524 */ 525 private class MessageIBBInputStream extends IBBInputStream { 526 527 @Override 528 protected StanzaListener getDataPacketListener() { 529 return new StanzaListener() { 530 531 @Override 532 public void processStanza(Stanza packet) { 533 // get data packet extension 534 DataPacketExtension data = (DataPacketExtension) packet.getExtension( 535 DataPacketExtension.ELEMENT, 536 DataPacketExtension.NAMESPACE); 537 538 // check if encoded data is valid 539 if (data.getDecodedData() == null) { 540 /* 541 * TODO once a majority of XMPP server implementation support XEP-0079 542 * Advanced Message Processing the invalid message could be answered with an 543 * appropriate error. For now we just ignore the packet. Subsequent packets 544 * with an increased sequence will cause the input stream to close the 545 * stream/session. 546 */ 547 return; 548 } 549 550 // data is valid; add to data queue 551 dataQueue.offer(data); 552 553 // TODO confirm packet once XMPP servers support XEP-0079 554 } 555 556 }; 557 } 558 559 @Override 560 protected StanzaFilter getDataPacketFilter() { 561 /* 562 * filter all message stanzas containing a data stanza(/packet) extension, matching session ID 563 * and recipient 564 */ 565 return new AndFilter(new StanzaTypeFilter(Message.class), new IBBDataPacketFilter()); 566 } 567 568 } 569 570 /** 571 * IBBDataPacketFilter class filters all packets from the remote peer of this session, 572 * containing an In-Band Bytestream data stanza(/packet) extension whose session ID matches this sessions 573 * ID. 574 */ 575 private class IBBDataPacketFilter implements StanzaFilter { 576 577 @Override 578 public boolean accept(Stanza packet) { 579 // sender equals remote peer 580 if (!packet.getFrom().equals(remoteJID)) { 581 return false; 582 } 583 584 DataPacketExtension data; 585 if (packet instanceof Data) { 586 data = ((Data) packet).getDataPacketExtension(); 587 } else { 588 // stanza contains data packet extension 589 data = packet.getExtension( 590 DataPacketExtension.ELEMENT, 591 DataPacketExtension.NAMESPACE); 592 if (data == null) { 593 return false; 594 } 595 } 596 597 // session ID equals this session ID 598 if (!data.getSessionID().equals(byteStreamRequest.getSessionID())) { 599 return false; 600 } 601 602 return true; 603 } 604 605 } 606 607 /** 608 * IBBOutputStream class is the base implementation of an In-Band Bytestream output stream. 609 * Subclasses of this output stream must provide a method to send data over XMPP stream. 610 */ 611 private abstract class IBBOutputStream extends OutputStream { 612 613 /* buffer with the size of this sessions block size */ 614 protected final byte[] buffer; 615 616 /* pointer to next byte to write to buffer */ 617 protected int bufferPointer = 0; 618 619 /* data packet sequence (range from 0 to 65535) */ 620 protected long seq = 0; 621 622 /* flag to indicate if output stream is closed */ 623 protected boolean isClosed = false; 624 625 /** 626 * Constructor. 627 */ 628 public IBBOutputStream() { 629 this.buffer = new byte[byteStreamRequest.getBlockSize()]; 630 } 631 632 /** 633 * Writes the given data stanza(/packet) to the XMPP stream. 634 * 635 * @param data the data packet 636 * @throws IOException if an I/O error occurred while sending or if the stream is closed 637 * @throws NotConnectedException 638 * @throws InterruptedException 639 */ 640 protected abstract void writeToXML(DataPacketExtension data) throws IOException, NotConnectedException, InterruptedException; 641 642 @Override 643 public synchronized void write(int b) throws IOException { 644 if (this.isClosed) { 645 throw new IOException("Stream is closed"); 646 } 647 648 // if buffer is full flush buffer 649 if (bufferPointer >= buffer.length) { 650 flushBuffer(); 651 } 652 653 buffer[bufferPointer++] = (byte) b; 654 } 655 656 @Override 657 public synchronized void write(byte[] b, int off, int len) throws IOException { 658 if (b == null) { 659 throw new NullPointerException(); 660 } 661 else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) 662 || ((off + len) < 0)) { 663 throw new IndexOutOfBoundsException(); 664 } 665 else if (len == 0) { 666 return; 667 } 668 669 if (this.isClosed) { 670 throw new IOException("Stream is closed"); 671 } 672 673 // is data to send greater than buffer size 674 if (len >= buffer.length) { 675 676 // "byte" off the first chunk to write out 677 writeOut(b, off, buffer.length); 678 679 // recursively call this method with the lesser amount 680 write(b, off + buffer.length, len - buffer.length); 681 } 682 else { 683 writeOut(b, off, len); 684 } 685 } 686 687 @Override 688 public synchronized void write(byte[] b) throws IOException { 689 write(b, 0, b.length); 690 } 691 692 /** 693 * Fills the buffer with the given data and sends it over the XMPP stream if the buffers 694 * capacity has been reached. This method is only called from this class so it is assured 695 * that the amount of data to send is <= buffer capacity 696 * 697 * @param b the data 698 * @param off the data 699 * @param len the number of bytes to write 700 * @throws IOException if an I/O error occurred while sending or if the stream is closed 701 */ 702 private synchronized void writeOut(byte[] b, int off, int len) throws IOException { 703 if (this.isClosed) { 704 throw new IOException("Stream is closed"); 705 } 706 707 // set to 0 in case the next 'if' block is not executed 708 int available = 0; 709 710 // is data to send greater that buffer space left 711 if (len > buffer.length - bufferPointer) { 712 // fill buffer to capacity and send it 713 available = buffer.length - bufferPointer; 714 System.arraycopy(b, off, buffer, bufferPointer, available); 715 bufferPointer += available; 716 flushBuffer(); 717 } 718 719 // copy the data left to buffer 720 System.arraycopy(b, off + available, buffer, bufferPointer, len - available); 721 bufferPointer += len - available; 722 } 723 724 @Override 725 public synchronized void flush() throws IOException { 726 if (this.isClosed) { 727 throw new IOException("Stream is closed"); 728 } 729 flushBuffer(); 730 } 731 732 private synchronized void flushBuffer() throws IOException { 733 734 // do nothing if no data to send available 735 if (bufferPointer == 0) { 736 return; 737 } 738 739 // create data packet 740 String enc = Base64.encodeToString(buffer, 0, bufferPointer); 741 DataPacketExtension data = new DataPacketExtension(byteStreamRequest.getSessionID(), 742 this.seq, enc); 743 744 // write to XMPP stream 745 try { 746 writeToXML(data); 747 } 748 catch (InterruptedException | NotConnectedException e) { 749 IOException ioException = new IOException(); 750 ioException.initCause(e); 751 throw ioException; 752 } 753 754 // reset buffer pointer 755 bufferPointer = 0; 756 757 // increment sequence, considering sequence overflow 758 this.seq = (this.seq + 1 == 65535 ? 0 : this.seq + 1); 759 760 } 761 762 @Override 763 public void close() throws IOException { 764 if (isClosed) { 765 return; 766 } 767 InBandBytestreamSession.this.closeByLocal(false); 768 } 769 770 /** 771 * Sets the close flag and optionally flushes the stream. 772 * 773 * @param flush if <code>true</code> flushes the stream 774 */ 775 protected void closeInternal(boolean flush) { 776 if (this.isClosed) { 777 return; 778 } 779 this.isClosed = true; 780 781 try { 782 if (flush) { 783 flushBuffer(); 784 } 785 } 786 catch (IOException e) { 787 /* 788 * ignore, because writeToXML() will not throw an exception if stream is already 789 * closed 790 */ 791 } 792 } 793 794 } 795 796 /** 797 * IQIBBOutputStream class implements IBBOutputStream to be used with IQ stanzas encapsulating 798 * the data packets. 799 */ 800 private class IQIBBOutputStream extends IBBOutputStream { 801 802 @Override 803 protected synchronized void writeToXML(DataPacketExtension data) throws IOException { 804 // create IQ stanza containing data packet 805 IQ iq = new Data(data); 806 iq.setTo(remoteJID); 807 808 try { 809 connection.createStanzaCollectorAndSend(iq).nextResultOrThrow(); 810 } 811 catch (Exception e) { 812 // close session unless it is already closed 813 if (!this.isClosed) { 814 InBandBytestreamSession.this.close(); 815 // Sadly we are unable to use the IOException(Throwable) constructor because this 816 // constructor is only supported from Android API 9 on. 817 IOException ioException = new IOException(); 818 ioException.initCause(e); 819 throw ioException; 820 } 821 } 822 823 } 824 825 } 826 827 /** 828 * MessageIBBOutputStream class implements IBBOutputStream to be used with message stanzas 829 * encapsulating the data packets. 830 */ 831 private class MessageIBBOutputStream extends IBBOutputStream { 832 833 @Override 834 protected synchronized void writeToXML(DataPacketExtension data) throws NotConnectedException, InterruptedException { 835 // create message stanza containing data packet 836 Message message = new Message(remoteJID); 837 message.addExtension(data); 838 839 connection.sendStanza(message); 840 841 } 842 843 } 844 845 /** 846 * Process IQ stanza. 847 * @param data 848 * @throws NotConnectedException 849 * @throws InterruptedException 850 */ 851 public void processIQPacket(Data data) throws NotConnectedException, InterruptedException { 852 inputStream.dataPacketListener.processStanza(data); 853 } 854 855}