Source code for aioxmpp.protocol

"""
:mod:`~aioxmpp.protocol` --- XML Stream implementation
######################################################

This module contains the :class:`XMLStream` class, which implements the XML
stream protocol used by XMPP. It makes extensive use of the :mod:`aioxmpp.xml`
module and the :mod:`aioxmpp.xso` subpackage to parse and serialize XSOs
received and sent on the stream.

In addition, helper functions to work with :class:`XMLStream` instances are
provided; these are not included in the class itself because they provide
additional functionality solely based on the public interface of the
class. Separating them helps with testing.

.. autoclass:: XMLStream

Utilities for XML streams
=========================

.. autofunction:: send_and_wait_for

.. autofunction:: reset_stream_and_get_features

Enumerations
============

.. autoclass:: Mode

.. autoclass:: State

"""

import asyncio
import functools
import inspect
import logging

from enum import Enum

import xml.sax as sax
import xml.parsers.expat as pyexpat

from . import xml, errors, xso, nonza, stanza, callbacks, statemachine
from .utils import namespaces

logger = logging.getLogger(__name__)


[docs]class Mode(Enum): """ Possible modes of connection for an XML stream. These define the namespaces used. .. attribute:: C2S A client stream connected to a server. This is the default mode and, currently, the only available mode. """ C2S = namespaces.client
@functools.total_ordering
[docs]class State(Enum): """ The possible states of a :class:`XMLStream`: .. attribute:: READY The initial state; this is the case when no underlying transport is connected. .. attribute:: STREAM_HEADER_SENT After a :class:`asyncio.Transport` calls :meth:`XMLStream.connection_made` on the xml stream, it sends the stream header and enters this state. .. attribute:: OPEN When the stream header of the peer is received, this state is entered and the XML stream can be used for sending and receiving XSOs. .. attribute:: CLOSING After :meth:`XMLStream.close` is called, this state is entered. We sent a stream footer and an EOF, if the underlying transport supports this. We still have to wait for the peer to close the stream. In this state and all following states, :class:`ConnectionError` instances are raised whenever an attempt is made to write to the stream. The exact instance depends on the reason of the closure. In this state, the stream waits for the remote to send a stream footer and the connection to shut down. For application purposes, the stream is already closed. .. attribute:: CLOSING_STREAM_FOOTER_RECEIVED At this point, the stream is properly closed on the XML stream level. This is the point where :meth:`XMLStream.close_and_wait` returns. .. attribute:: CLOSED This state is entered when the connection is lost in any way. This is the final state. """ def __lt__(self, other): return self.value < other.value READY = 0 STREAM_HEADER_SENT = 1 OPEN = 2 CLOSING = 3 CLOSING_STREAM_FOOTER_RECEIVED = 4 CLOSED = 6
class DebugWrapper: def __init__(self, dest, logger): self.dest = dest self.logger = logger if hasattr(dest, "flush"): self._flush = dest.flush else: self._flush = lambda: None self._pieces = [] def write(self, data): self._pieces.append(data) self.dest.write(data) def flush(self): self.logger.debug("SENT %r", b"".join(self._pieces)) self._pieces = [] self._flush
[docs]class XMLStream(asyncio.Protocol): """ XML stream implementation. This is an streaming :class:`asyncio.Protocol` which translates the received bytes into XSOs. `to` must be a domain :class:`~aioxmpp.structs.JID` which identifies the domain to which the stream shall connect. `features_future` must be a :class:`asyncio.Future` instance; the XML stream will set the first :class:`~aioxmpp.nonza.StreamFeatures` node it receives as the result of the future. `sorted_attributes` is mainly for unittesting purposes; this is an argument to the :class:`~aioxmpp.xml.XMPPXMLGenerator` and slows down the XML serialization, but produces deterministic results, which is important for testing. Generally, it is preferred to leave this argument at its default. `base_logger` may be a :class:`logging.Logger` instance to use. The XML stream will create a child called ``XMLStream`` at that logger and use that child for logging purposes. This eases debugging and allows for connection-specific loggers. Receiving XSOs: .. attribute:: stanza_parser A :class:`~aioxmpp.xso.XSOParser` instance which is wired to a :class:`~aioxmpp.xml.XMPPXMLProcessor` which processes the received bytes. To receive XSOs over the XML stream, use :attr:`stanza_parser` and register class callbacks on it using :meth:`~aioxmpp.xso.XSOParser.add_class`. .. attribute:: error_handler This should be assigned a callable, taking two arguments: a :class:`xso.XSO` instance, which is the partial(!) top-level stream element and an exception indicating the failure. Partial here means that it is not guaranteed that anything but the attributes on the partial XSO itself are there. Any children or text payload is most likely missing, as it probably caused the error. .. versionadded:: 0.4 Sending XSOs: .. automethod:: send_xso Manipulating stream state: .. automethod:: starttls .. automethod:: reset .. automethod:: close .. automethod:: abort Signals: .. signal:: on_closing A :class:`~aioxmpp.callbacks.Signal` which fires when the underlying transport of the stream reports an error or when a stream error is received. The signal is fired with the corresponding exception as the only argument. If the stream gets closed by the application without any error, the argument is :data:`None`. By the time the callback fires, the stream is already unusable for sending stanzas. It *may* however still receive stanzas, if the stream shutdown was initiated by the application and the peer has not yet send its stream footer. If the application is not able to handle these stanzas, it is legitimate to disconnect their handlers from the :attr:`stanza_parser`; the stream will be able to deal with unhandled top level stanzas correctly at this point (by ignoring them). Timeouts: .. attribute:: shutdown_timeout The maximum time to wait for the peer ``</stream:stream>`` before forcing to close the transport and considering the stream closed. """ on_closing = callbacks.Signal() shutdown_timeout = 15 def __init__(self, to, features_future, sorted_attributes=False, base_logger=logging.getLogger("aioxmpp"), loop=None): self._to = to self._sorted_attributes = sorted_attributes self._logger = base_logger.getChild("XMLStream") self._transport = None self._features_future = features_future self._exception = None self._loop = loop or asyncio.get_event_loop() self._error_futures = [] self._smachine = statemachine.OrderedStateMachine(State.READY) self._transport_closing = False self._closing_future = asyncio.async( self._smachine.wait_for( State.CLOSING ), loop=loop ) self._closing_future.add_done_callback( self._stream_starts_closing ) self.stanza_parser = xso.XSOParser() self.stanza_parser.add_class(nonza.StreamError, self._rx_stream_error) self.stanza_parser.add_class(nonza.StreamFeatures, self._rx_stream_features) self.error_handler = None def _invalid_transition(self, to, via=None): text = "invalid state transition: from={} to={}".format( self._smachine.state, to) if via: text += " (via: {})".format(via) return RuntimeError(text) def _invalid_state(self, at=None): text = "invalid state: {}".format(self._smachine.state) if at: text += " (at: {})".format(at) return RuntimeError(text) def _close_transport(self): if self._transport_closing: return self._transport_closing = True self._transport.close() def _stream_starts_closing(self, task): exc = self._exception if exc is None: exc = ConnectionError("stream shut down") self.on_closing(self._exception) for fut in self._error_futures: if not fut.done(): fut.set_exception(exc) self._error_futures.clear() if task.cancelled(): # this happens if connection_lost happens before we enter closing # state. causes are: stream abortion, connection reset by peer etc. # no need to wait for a timeout in that case return if task.exception() is not None: # this happens if we skip over the CLOSING state, which implies # that the stream footer has been seen; no reason to worry about # the timeout in that case. return task.result() asyncio.async( self._stream_footer_timeout(), loop=self._loop ).add_done_callback(lambda x: x.result()) @asyncio.coroutine def _stream_footer_timeout(self): self._logger.debug( "waiting for at most %s seconds for peer stream footer", self.shutdown_timeout ) yield from asyncio.sleep(self.shutdown_timeout) if self._smachine.state >= State.CLOSING_STREAM_FOOTER_RECEIVED: # state already reached, stop return self._logger.info("timeout while waiting for stream footer") self._close_transport() self._smachine.state = State.CLOSING_STREAM_FOOTER_RECEIVED def _fail(self, err): self._exception = err self.close() def _require_connection(self, accept_partial=False): if (self._smachine.state == State.OPEN or (accept_partial and self._smachine.state == State.STREAM_HEADER_SENT)): return if self._exception: raise self._exception raise ConnectionError("xmlstream not connected") def _rx_exception(self, exc): if isinstance(exc, stanza.StanzaError): if self.error_handler: self.error_handler(exc.partial_obj, exc) elif isinstance(exc, xso.UnknownTopLevelTag): if self._smachine.state >= State.CLOSING: self._logger.info("ignoring unknown top-level tag, " "we’re closing") return raise errors.StreamError( condition=(namespaces.streams, "unsupported-stanza-type"), text="unsupported stanza: {}".format( xso.tag_to_str((exc.ev_args[0], exc.ev_args[1])) )) from None else: context = exc.__context__ or exc.__cause__ raise exc from context def _rx_stream_header(self): if self._processor.remote_version != (1, 0): raise errors.StreamError( (namespaces.streams, "unsupported-version"), text="unsupported version") self._smachine.state = State.OPEN def _rx_stream_error(self, err): self._fail(err.to_exception()) def _rx_stream_footer(self): if self._smachine.state < State.CLOSING: # any other state, this is an issue if self._exception is None: self._fail(ConnectionError("stream closed by peer")) self.close() elif self._smachine.state >= State.CLOSING_STREAM_FOOTER_RECEIVED: self._logger.info("late stream footer received") return self._close_transport() self._smachine.state = State.CLOSING_STREAM_FOOTER_RECEIVED def _rx_stream_features(self, features): self.stanza_parser.remove_class(nonza.StreamFeatures) self._features_future.set_result(features) self._features_future = None def _rx_feed(self, blob): try: self._parser.feed(blob) except sax.SAXParseException as exc: if (exc.getException().args[0].startswith( pyexpat.errors.XML_ERROR_UNDEFINED_ENTITY)): # this will raise an appropriate stream error xml.XMPPLexicalHandler.startEntity("foo") raise errors.StreamError( condition=(namespaces.streams, "bad-format"), text=str(exc) ) except errors.StreamError as exc: raise except Exception as exc: self._logger.exception( "unexpected exception while parsing stanza" " bubbled up through parser. stream so ded.") raise errors.StreamError( condition=(namespaces.streams, "internal-server-error"), text="Internal error while parsing XML. Client logs have more" " details." ) def connection_made(self, transport): if self._smachine.state != State.READY: raise self._invalid_state("connection_made") assert self._transport is None self._transport = transport self._writer = None self._exception = None # we need to set the state before we call reset() self._smachine.state = State.STREAM_HEADER_SENT self.reset() def connection_lost(self, exc): # in connection_lost, we really cannot do anything except shutting down # the stream without sending any more data if self._smachine.state == State.CLOSED: return self._smachine.state = State.CLOSED self._exception = self._exception or exc self._kill_state() self._writer = None self._transport = None self._closing_future.cancel() def data_received(self, blob): self._logger.debug("RECV %r", blob) try: self._rx_feed(blob) except errors.StreamError as exc: stanza_obj = nonza.StreamError.from_exception(exc) try: self._writer.send(stanza_obj) except StopIteration: pass self._fail(exc) # shutdown, we do not really care about </stream:stream> by the # server at this point self._close_transport() def eof_received(self): if self._smachine.state == State.OPEN: # close and set to EOF received self.close() # common actions below elif (self._smachine.state == State.CLOSING or self._smachine.state == State.CLOSING_STREAM_FOOTER_RECEIVED): # these states are fine, common actions below pass else: self._logger.warn("unexpected eof_received (in %s state)", self._smachine.state) # common actions below self._smachine.state = State.CLOSING_STREAM_FOOTER_RECEIVED self._close_transport()
[docs] def close(self): """ Close the XML stream and the underlying transport. This gracefully shuts down the XML stream and the transport, if possible by writing the eof using :meth:`asyncio.Transport.write_eof` after sending the stream footer. After a call to :meth:`close`, no other stream manipulating or sending method can be called; doing so will result in a :class:`ConnectionError` exception or any exception caused by the transport during shutdown. Calling :meth:`close` while the stream is closing or closed is a no-op. """ if (self._smachine.state == State.CLOSING or self._smachine.state == State.CLOSED): return self._writer.close() if self._transport.can_write_eof(): self._transport.write_eof() if self._smachine.state == State.STREAM_HEADER_SENT: # at this point, we cannot wait for the peer to send # </stream:stream> self._close_transport() self._smachine.state = State.CLOSING
@asyncio.coroutine def close_and_wait(self): """ Close the XML stream and the underlying transport and wait for for the XML stream to be properly terminated. The underlying transport may still be open when this coroutine returns, but closing has already been initiated. The other remarks about :meth:`close` hold. """ self.close() yield from self._smachine.wait_for_at_least( State.CLOSING_STREAM_FOOTER_RECEIVED ) def _kill_state(self): if self._writer: if inspect.getgeneratorstate(self._writer) == "GEN_SUSPENDED": try: self._writer.throw(xml.AbortStream()) except StopIteration: pass else: self._writer = None self._processor = None self._parser = None def _reset_state(self): self._kill_state() self._processor = xml.XMPPXMLProcessor() self._processor.stanza_parser = self.stanza_parser self._processor.on_stream_header = self._rx_stream_header self._processor.on_stream_footer = self._rx_stream_footer self._processor.on_exception = self._rx_exception self._parser = xml.make_parser() self._parser.setContentHandler(self._processor) if self._logger.getEffectiveLevel() <= logging.DEBUG: dest = DebugWrapper(self._transport, self._logger) else: dest = self._transport self._writer = xml.write_xmlstream( dest, self._to, nsmap={None: "jabber:client"}, sorted_attributes=self._sorted_attributes)
[docs] def reset(self): """ Reset the stream by discarding all state and re-sending the stream header. Calling :meth:`reset` when the stream is disconnected or currently disconnecting results in either :class:`ConnectionError` being raised or the exception which caused the stream to die (possibly a received stream error or a transport error) to be reraised. :meth:`reset` puts the stream into :attr:`~State.STREAM_HEADER_SENT` state and it cannot be used for sending XSOs until the peer stream header has been received. Usually, this is not a problem as stream resets only occur during stream negotiation and stream negotiation typically waits for the peers feature node to arrive first. """ self._require_connection(accept_partial=True) self._reset_state() next(self._writer) self._smachine.rewind(State.STREAM_HEADER_SENT)
[docs] def abort(self): """ Abort the stream by writing an EOF if possible and closing the transport. The transport is closed using :meth:`asyncio.BaseTransport.close`, so buffered data is sent, but no more data will be received. The stream is in :attr:`State.CLOSED` state afterwards. This also works if the stream is currently closing, that is, waiting for the peer to send a stream footer. In that case, the stream will be closed locally as if the stream footer had been received. .. versionadded:: 0.5 """ if self._smachine.state == State.CLOSED: return if (self._smachine.state != State.CLOSING and self._transport.can_write_eof()): self._transport.write_eof() self._close_transport()
[docs] def send_xso(self, obj): """ Send an XSO `obj` over the stream. Calling :meth:`send_xso` while the stream is disconnected, disconnecting or still waiting for the remote to send a stream header causes :class:`ConnectionError` to be raised. If the stream got disconnected due to a transport or stream error, that exception is re-raised instead of the :class:`ConnectionError`. """ self._require_connection() self._writer.send(obj)
def can_starttls(self): """ Return true if the transport supports STARTTLS and false otherwise. If the stream is currently not connected, this returns false. """ return (hasattr(self._transport, "can_starttls") and self._transport.can_starttls()) @asyncio.coroutine
[docs] def starttls(self, ssl_context, post_handshake_callback=None): """ Start TLS on the transport and wait for it to complete. The `ssl_context` and `post_handshake_callback` arguments are forwarded to the transports :meth:`aioopenssl.STARTTLSTransport.starttls` coroutine method. If the transport does not support starttls, :class:`RuntimeError` is raised; support for starttls can be discovered by querying :meth:`can_starttls`. After :meth:`starttls` returns, you must call :meth:`reset`. Any other method may fail in interesting ways as the internal state is discarded when starttls succeeds, for security reasons. :meth:`reset` re-creates the internal structures. """ self._require_connection() if not self.can_starttls(): raise RuntimeError("starttls not available on transport") yield from self._transport.starttls(ssl_context, post_handshake_callback) self._reset_state()
def error_future(self): """ Return a future which will receive the next XML stream error as exception. It is safe to cancel the future at any time. """ fut = asyncio.Future(loop=self._loop) self._error_futures.append(fut) return fut @property def transport(self): """ The underlying :class:`asyncio.Transport` instance. This attribute is :data:`None` if the :class:`XMLStream` is currently not connected. This attribute cannot be set. """ return self._transport @property def state(self): """ The current :class:`State` of the XML stream. This attribute cannot be set. """ return self._smachine.state
@asyncio.coroutine
[docs]def send_and_wait_for(xmlstream, send, wait_for, timeout=None): fut = asyncio.Future() wait_for = list(wait_for) def cleanup(): for anticipated_cls in wait_for: xmlstream.stanza_parser.remove_class(anticipated_cls) def receive(obj): nonlocal fut fut.set_result(obj) cleanup() failure_future = xmlstream.error_future() for anticipated_cls in wait_for: xmlstream.stanza_parser.add_class( anticipated_cls, receive) try: for to_send in send: xmlstream.send_xso(to_send) done, pending = yield from asyncio.wait( [ fut, failure_future, ], timeout=timeout, return_when=asyncio.FIRST_COMPLETED, loop=xmlstream._loop) for other_fut in pending: other_fut.cancel() if fut in done: return fut.result() if failure_future in done: failure_future.result() raise TimeoutError() except: cleanup() raise
@asyncio.coroutine
[docs]def reset_stream_and_get_features(xmlstream, timeout=None): fut = asyncio.Future() def cleanup(): xmlstream.stanza_parser.remove_class(nonza.StreamFeatures) def receive(obj): nonlocal fut fut.set_result(obj) cleanup() failure_future = xmlstream.error_future() xmlstream.stanza_parser.add_class( nonza.StreamFeatures, receive) try: xmlstream.reset() done, pending = yield from asyncio.wait( [ fut, failure_future, ], timeout=timeout, return_when=asyncio.FIRST_COMPLETED, loop=xmlstream._loop) for other_fut in pending: other_fut.cancel() if fut in done: return fut.result() if failure_future in done: failure_future.result() raise TimeoutError() except: cleanup() raise
def send_stream_error_and_close( xmlstream, condition, text, custom_condition=None): xmlstream.send_xso(nonza.StreamError( condition=condition, text=text)) if custom_condition is not None: logger.warn("custom_condition argument to send_stream_error_and_close" " not implemented") xmlstream.close()