Source code for aioxmpp.node

"""
:mod:`~aioxmpp.node` --- XMPP network nodes (clients, mostly)
#############################################################

This module contains functions to connect to an XMPP server, as well as
maintaining the stream. In addition, a client class which completely manages a
stream based on a presence setting is provided.

Using XMPP
==========

.. autoclass:: AbstractClient

.. autoclass:: PresenceManagedClient

Connecting streams low-level
============================

.. autofunction:: connect_secured_xmlstream

.. autofunction:: connect_to_xmpp_server

"""
import asyncio
import contextlib
import logging

from datetime import timedelta

from . import (
    network,
    ssl_transport,
    protocol,
    errors,
    stream,
    callbacks,
    stream_xsos,
    rfc3921,
    rfc6120,
    stanza,
    structs,
)
from .utils import namespaces


def lookup_addresses(loop, jid):
    addresses = yield from network.find_xmpp_host_addr(
        loop,
        jid.domain)

    return network.group_and_order_srv_records(addresses)


@asyncio.coroutine
[docs]def connect_to_xmpp_server(jid, *, override_peer=None, loop=None): """ Connect to an XMPP server which serves the domain of the given `jid`. `override_peer` may be a tuple of host name (or IP address) and port. If given, this will be the first peer the stream tries to connect to. Only if that connection fails the usual XMPP server lookup routine takes place. `loop` must be either a valid :class:`asyncio.BaseEventLoop` or :data:`None`, in which case the current event loop is used. Return a triple consisting of the `transport`, the :class:`~aioxmpp.protocol.XMLStream` instance and a :class:`asyncio.Future` on the first :class:`~aioxmpp.stream_xsos.StreamFeatures` node. If the connection fails :class:`OSError` is raised. That OSError may in fact be a :class:`~aioxmpp.errors.MultiOSError`, which gives more information on the different errors which occured. If the domain does not support XMPP at all (by indicating that fact in the SRV records), :class:`ValueError` is raised. """ loop = loop or asyncio.get_event_loop() features_future = asyncio.Future() xmlstream = protocol.XMLStream( to=jid.domain, features_future=features_future) exceptions = [] if override_peer is not None: host, port = override_peer try: transport, _ = yield from ssl_transport.create_starttls_connection( loop, lambda: xmlstream, host=host, port=port, peer_hostname=host, server_hostname=jid.domain, use_starttls=True) except OSError as exc: exceptions.append(exc) else: return transport, xmlstream, features_future for host, port in (yield from lookup_addresses(loop, jid)): try: transport, _ = yield from ssl_transport.create_starttls_connection( loop, lambda: xmlstream, host=host, port=port, peer_hostname=host, server_hostname=jid.domain, use_starttls=True) except OSError as exc: exceptions.append(exc) else: break else: if not exceptions: # domain does not support XMPP (no options at all to connect to it) raise OSError( "no options to connect to {}".format(jid.domain) ) if len(exceptions) == 1: raise exceptions[0] raise errors.MultiOSError( "failed to connect to server for {}".format(jid), exceptions) return transport, xmlstream, features_future
@asyncio.coroutine
[docs]def connect_secured_xmlstream(jid, security_layer, negotiation_timeout=1.0, override_peer=None, loop=None): """ Connect to an XMPP server which serves the domain of the given `jid` and apply the given `security_layer` (see :func:`~aioxmpp.security_layer.security_layer`). `loop` must be either a valid :class:`asyncio.BaseEventLoop` or :data:`None`, in which case the current event loop is used. The `negotiation_timeout` is passed to the security layer and used for connect timeouts. `override_peer` is passed to :func:`connect_to_xmpp_server`. Return a triple consisting of the `transport`, the :class:`~aioxmpp.protocol.XMLStream` and the current :class:`~aioxmpp.stream_xsos.StreamFeatures` node. The `transport` returned in the triple is the one returned by the security layer and is :data:`None` if no starttls has been negotiated. To gain access to the transport used if the transport returned is :data:`None`, use the :attr:`~aioxmpp.protocol.XMLStream.transport` of the XML stream. If the connection fails :class:`OSError` is raised. That OSError may in fact be a :class:`~aioxmpp.errors.MultiOSError`, which gives more information on the different errors which occured. If the domain does not support XMPP at all (by indicating that fact in the SRV records), :class:`ValueError` is raised. If SASL or TLS negotiation fails, the corresponding exception type from :mod:`aioxmpp.errors` is raised. Most notably, authentication failures caused by invalid credentials or a user abort are raised as :class:`~aioxmpp.errors.AuthenticationFailure`. """ try: transport, xmlstream, features_future = yield from asyncio.wait_for( connect_to_xmpp_server(jid, override_peer=override_peer, loop=loop), timeout=negotiation_timeout, loop=loop ) except asyncio.TimeoutError: raise TimeoutError("connection to {} timed out".format(jid)) features = yield from features_future try: new_transport, features = yield from security_layer( negotiation_timeout, jid, features, xmlstream) except errors.SASLUnavailable as exc: protocol.send_stream_error_and_close( xmlstream, condition=(namespaces.streams, "policy-violation"), text=str(exc) ) raise except errors.TLSUnavailable as exc: protocol.send_stream_error_and_close( xmlstream, condition=(namespaces.streams, "policy-violation"), text=str(exc) ) raise except errors.SASLFailure as exc: protocol.send_stream_error_and_close( xmlstream, condition=(namespaces.streams, "undefined-condition"), text=str(exc) ) raise return new_transport, xmlstream, features
[docs]class AbstractClient: """ The :class:`AbstractClient` is a base class for implementing XMPP client classes. These classes deal with managing the :class:`~aioxmpp.stream.StanzaStream` and the underlying :class:`~aioxmpp.protocol.XMLStream` instances. The abstract client provides functionality for connecting the xmlstream as well as signals which indicate changes in the stream state. The `jid` must be a :class:`~aioxmpp.structs.JID` for which to connect. The `security_layer` is best created using the :func:`~aioxmpp.security_layer.security_layer` function and must provide authentication for the given `jid`. The `negotiation_timeout` argument controls the :attr:`negotiation_timeout` attribute. If `loop` is given, it must be a :class:`asyncio.BaseEventLoop` instance. If it is not given, the current event loop is used. As a glue between the stanza stream and the XML stream, it also knows about stream management and performs stream management negotiation. It is specialized on client operations, which implies that it will try to keep the stream alive as long as wished by the client. In general, there are no fatal errors (aside from stream negotiation problems) which stop a :class:`AbstractClient` from working. It makes use of stream management as far as possible and abstracts away the gritty low level details. In general, it is sufficient to observe the :attr:`on_stream_established` and :attr:`on_stream_destroyed` events, which notify a user about when a stream becomes available and when it becomes unavailable. If authentication fails (or another stream negotiation error occurs), the client fails and :attr:`on_failure` is fired. :attr:`running` becomes false and the client needs to be re-started manually by calling :meth:`start`. .. versionchanged:: 0.4 Since 0.4, support for legacy XMPP sessions has been implemented. Mainly for compatiblity with ejabberd. Controlling the client: .. automethod:: start .. automethod:: stop .. autoattribute:: running .. attribute:: negotiation_timeout = timedelta(seconds=60) The timeout applied to the connection process and the individual steps of negotiating the stream. See the `negotiation_timeout` argument to :func:`connect_secured_xmlstream`. Connection information: .. autoattribute:: established .. autoattribute:: local_jid .. attribute:: stream The :class:`~aioxmpp.stream.StanzaStream` instance used by the node. .. attribute:: stream_features An instance of :class:`~aioxmpp.stream_xsos.StreamFeatures`. This is the most-recently received stream features information (the one received right before resource binding). While no stream has been established yet, this is :data:`None`. During transparent re-negotiation, that information may be obsolete. However, when :attr:`before_stream_established` fires, the information is up-to-date. Exponential backoff on interruptions: .. attribute:: backoff_start When an underlying XML stream fails due to connectivity issues (generic :class:`OSError` raised), exponential backoff takes place before attempting to reconnect. The initial time to wait before reconnecting is described by :attr:`backoff_start`. .. attribute:: backoff_factor Each subsequent time a connection fails, the previous backoff time is multiplied with :attr:`backoff_factor`. .. attribute:: backoff_cap The backoff time is capped to :attr:`backoff_cap`, to avoid having unrealistically high values. Signals: .. signal:: on_failure() This signal is fired when the client fails and stops. .. syncsignal:: before_stream_established() This coroutine signal is executed right before :meth:`on_stream_established` fires. .. signal:: on_stopped() Fires when the client stops gracefully. This is the counterpart to :meth:`on_failure`. .. signal:: on_stream_established() When the stream is established and resource binding took place, this event is fired. It means that the stream can now be used for XMPP interactions. .. signal:: on_stream_destroyed() This is called whenever a stream is destroyed. The conditions for this are the same as for :attr:`.StanzaStream.on_stream_destroyed`. This event can be used to know when to discard all state about the XMPP connection, such as roster information. Services: .. automethod:: summon """ on_failure = callbacks.Signal() on_stopped = callbacks.Signal() on_stream_destroyed = callbacks.Signal() on_stream_established = callbacks.Signal() before_stream_established = callbacks.SyncSignal() def __init__(self, local_jid, security_layer, negotiation_timeout=timedelta(seconds=60), loop=None): super().__init__() self._local_jid = local_jid self._loop = loop or asyncio.get_event_loop() self._main_task = None self._security_layer = security_layer self._failure_future = asyncio.Future() self._logger = logging.getLogger("aioxmpp.AbstractClient") self._backoff_time = None self._established = False self._services = {} self.stream_features = None self.negotiation_timeout = negotiation_timeout self.backoff_start = timedelta(seconds=1) self.backoff_factor = 1.2 self.backoff_cap = timedelta(seconds=60) self.stream = stream.StanzaStream(local_jid.bare()) def _stream_failure(self, exc): self._failure_future.set_result(exc) self._failure_future = asyncio.Future() def _stream_destroyed(self): if self._established: self._established = False self.on_stream_destroyed() def _on_bind_done(self, task): try: task.result() except asyncio.CancelledError: pass except Exception as err: self._logger.error("resource binding failed: %r", err) self._main_task.cancel() self.on_failure(err) def _on_main_done(self, task): try: task.result() except asyncio.CancelledError: # task terminated normally self.on_stopped() except Exception as err: self._logger.exception("main failed") self.on_failure(err) @asyncio.coroutine def _try_resume_stream_management(self, xmlstream, features): try: yield from self.stream.resume_sm(xmlstream) except errors.StreamNegotiationFailure: return False return True @asyncio.coroutine def _negotiate_legacy_session(self): self._logger.debug( "remote server announces support for legacy sessions" ) yield from self.stream.send_iq_and_wait_for_reply( stanza.IQ(type_="set", payload=rfc3921.Session()) ) self._logger.debug( "legacy session negotiated (upgrade your server!)" ) @asyncio.coroutine def _negotiate_stream(self, xmlstream, features): server_can_do_sm = True try: features[stream_xsos.StreamManagementFeature] except KeyError: if self.stream.sm_enabled: self._logger.warn("server isn’t advertising SM anymore") self.stream.stop_sm() server_can_do_sm = False self._logger.debug("negotiating stream (server_can_do_sm=%s)", server_can_do_sm) if self.stream.sm_enabled: resumed = yield from self._try_resume_stream_management( xmlstream, features) if resumed: return features, resumed else: resumed = False self.stream_features = features self.stream.start(xmlstream) if not resumed: self._logger.debug("binding to resource") yield from self._bind() if server_can_do_sm: self._logger.debug("attempting to start stream management") try: yield from self.stream.start_sm() except errors.StreamNegotiationFailure: self._logger.debug("stream management failed to start") self._logger.debug("stream management started") try: features[rfc3921.SessionFeature] except KeyError: pass # yay else: yield from self._negotiate_legacy_session() self._established = True yield from self.before_stream_established() self.on_stream_established() return features, resumed @asyncio.coroutine def _bind(self): iq = stanza.IQ(type_="set") iq.payload = rfc6120.Bind(resource=self._local_jid.resource) try: result = yield from self.stream.send_iq_and_wait_for_reply(iq) except errors.XMPPError as exc: raise errors.StreamNegotiationFailure( "Resource binding failed: {}".format(exc) ) self._local_jid = result.jid self._logger.info("bound to jid: %s", self._local_jid) @asyncio.coroutine def _main_impl(self): failure_future = self._failure_future override_peer = None if self.stream.sm_enabled: override_peer = self.stream.sm_location if override_peer: override_peer = str(override_peer[0]), override_peer[1] tls_transport, xmlstream, features = \ yield from connect_secured_xmlstream( self._local_jid, self._security_layer, negotiation_timeout=self.negotiation_timeout.total_seconds(), override_peer=override_peer, loop=self._loop) try: features, sm_resumed = yield from self._negotiate_stream( xmlstream, features) self._backoff_time = None exc = yield from failure_future self._logger.error("stream failed: %s", exc) except asyncio.CancelledError: self._logger.info("client shutting down") # cancelled, this means a clean shutdown is requested yield from self.stream.close() raise finally: self._logger.info("stopping stream") self.stream.stop() @asyncio.coroutine def _main(self): with contextlib.ExitStack() as stack: stack.enter_context( self.stream.on_failure.context_connect(self._stream_failure) ) stack.enter_context( self.stream.on_stream_destroyed.context_connect( self._stream_destroyed) ) while True: self._failure_future = asyncio.Future() try: yield from self._main_impl() except errors.StreamNegotiationFailure: if self.stream.sm_enabled: self.stream.stop_sm() raise except OSError: if self._backoff_time is None: self._backoff_time = self.backoff_start.total_seconds() yield from asyncio.sleep(self._backoff_time) self._backoff_time *= self.backoff_factor if self._backoff_time > self.backoff_cap.total_seconds(): self._backoff_time = self.backoff_cap.total_seconds() continue # retry
[docs] def start(self): """ Start the client. If it is already :attr:`running`, :class:`RuntimeError` is raised. While the client is running, it will try to keep an XMPP connection open to the server associated with :attr:`local_jid`. """ if self.running: raise RuntimeError("client already running") self._main_task = asyncio.async( self._main(), loop=self._loop ) self._main_task.add_done_callback(self._on_main_done)
[docs] def stop(self): """ Stop the client. This sends a signal to the clients main task which makes it terminate. It may take some cycles through the event loop to stop the client task. To check whether the task has actually stopped, query :attr:`running`. """ if not self.running: return self._main_task.cancel() # services
def _summon(self, class_): try: return self._services[class_] except KeyError: instance = class_(self) self._services[class_] = instance return instance
[docs] def summon(self, class_): """ Summon a :class:`~aioxmpp.service.Service` for the client. If the `class_` has already been summoned for the client, it’s instance is returned. Otherwise, all requirements for the class are first summoned (if they are not there already). Afterwards, the class itself is summoned and the instance is returned. """ requirements = sorted(class_.ORDER_AFTER) for req in requirements: self._summon(req) return self._summon(class_) # properties
@property
[docs] def local_jid(self): """ The :class:`~aioxmpp.structs.JID` the client currently has. While the client is disconnected, only the bare JID part is authentic, as the resource is ultimately determined by the server. Writing this attribute is not allowed, as changing the JID introduces a lot of issues with respect to reusability of the stream. Instanciate a new :class:`AbstractClient` if you need to change the bare part of the JID. .. note:: Changing the resource between reconnects may be allowed later. """ return self._local_jid
@property
[docs] def running(self): """ true if the client is currently running, false otherwise. """ return self._main_task is not None and not self._main_task.done()
@property
[docs] def established(self): """ true if the stream is currently established (as defined in :attr:`on_stream_established`) and false otherwise. """ return self._established
[docs]class PresenceManagedClient(AbstractClient): """ A presence managed XMPP client. The arguments are passed to the :class:`AbstractClient` constructor. While the start/stop interfaces of :class:`AbstractClient` are still available, it is recommended to control the presence managed client solely using the :attr:`presence` property. The initial presence is set to `unavailable`, thus, the client will not connect immediately. .. autoattribute:: presence .. automethod:: set_presence Signals: .. attribute:: on_presence_sent The event is fired after :attr:`~AbstractClient.on_stream_established` and after the current presence has been sent to the server as *initial presence*. """ on_presence_sent = callbacks.Signal() def __init__(self, jid, security_layer, **kwargs): super().__init__(jid, security_layer, **kwargs) self._presence = structs.PresenceState(), [] self.on_stream_established.connect(self._handle_stream_established) def _resend_presence(self): pres = stanza.Presence() state, status = self._presence state.apply_to_stanza(pres) pres.status.extend(status) pres.autoset_id() self.stream.enqueue_stanza(pres) def _handle_stream_established(self): self._resend_presence() self.on_presence_sent() def _update_presence(self): if self._presence[0].available: if not self.running: self.start() elif self.established: self._resend_presence() else: if self.running: self.stop() @property def presence(self): """ Control or query the current presence state (see :class:`~.structs.PresenceState`) of the client. Note that when reading, the property only returns the "set" value, not the actual value known to the server (and others). This may differ if the connection is still being established. .. seealso:: Setting the presence state using :attr:`presence` clears the `status` of the presence. To set the status and state at once, use :meth:`set_presence`. Upon setting this attribute, the :class:`PresenceManagedClient` will do whatever neccessary to achieve the given presence. If the presence is an `available` presence, the client will attempt to connect to the server. If the presence is `unavailable` and the client is currently connected, it will disconnect. Instead of setting the presence to unavailable, :meth:`stop` can also be called. The :attr:`presence` attribute is *not* affected by calls to :meth:`start` or :meth:`stop`. """ return self._presence[0] @presence.setter
[docs] def presence(self, value): self._presence = value, [] self._update_presence()
[docs] def set_presence(self, state, status): """ Set the presence `state` and `status` on the client. This has the same effects as writing `state` to :attr:`presence`, but the status of the presence is also set at the same time. `status` must be either a string or an iterable containing :class:`.stanza.Status` objects. The :class:`.stanza.Status` instances are saved and added to the presence stanza when it is time to send it. The `status` is the text shown alongside the `state` (indicating availability such as *away*, *do not disturb* and *free to chat*). """ if isinstance(status, str): status = [stanza.Status(status)] else: status = list(status) self._presence = state, status self._update_presence()

aioxmpp

Navigation