"""
:mod:`~aioxmpp.node` --- XMPP network nodes (clients, mostly)
#############################################################
This module contains functions to connect to an XMPP server, as well as
maintaining the stream. In addition, a client class which completely manages a
stream based on a presence setting is provided.
Using XMPP
==========
.. autoclass:: AbstractClient
.. autoclass:: PresenceManagedClient
Connecting streams low-level
============================
.. autofunction:: discover_connectors
.. autofunction:: connect_xmlstream
Utilities
=========
.. autoclass:: UseConnected
"""
import asyncio
import contextlib
import logging
from datetime import timedelta
import dns.resolver
import aiosasl
from . import (
connector,
network,
protocol,
errors,
stream,
callbacks,
nonza,
rfc3921,
rfc6120,
stanza,
structs,
security_layer,
)
from .utils import namespaces
logger = logging.getLogger(__name__)
def lookup_addresses(loop, jid):
addresses = yield from network.find_xmpp_host_addr(
loop,
jid.domain)
return network.group_and_order_srv_records(addresses)
@asyncio.coroutine
[docs]def discover_connectors(domain, loop=None, logger=logger):
"""
Discover all connection options for a domain, in descending order of
preference.
This coroutine returns options discovered from SRV records, or if none are
found, the generic option using the domain name and the default XMPP client
port.
Each option is represented by a triple ``(host, port, connector)``.
`connector` is a :class:`aioxmpp.connector.BaseConnector` instance which is
suitable to connect to the given host and port.
`logger` is the logger used by the function.
The following sources are supported:
* :rfc:`6120` SRV records. One option is returned per SRV record.
If one of the SRV records points to the root name (``.``),
:class:`ValueError` is raised (the domain specifically said that XMPP is
not supported here).
* :xep:`368` SRV records. One option is returned per SRV record.
* :rfc:`6120` fallback process (only if no SRV records are found). One
option is returned for the host name with the default XMPP client port.
The options discovered from SRV records are mixed together, ordered by
priority and then within priorities are shuffled according to their weight.
Thus, if there are multiple records of equal priority, the result of the
function is not deterministic.
.. versionadded:: 0.6
"""
try:
starttls_srv_records = yield from network.lookup_srv(
domain,
"xmpp-client",
)
starttls_srv_disabled = False
except ValueError:
starttls_srv_records = []
starttls_srv_disabled = True
try:
tls_srv_records = yield from network.lookup_srv(
domain,
"xmpps-client",
)
tls_srv_disabled = False
except ValueError:
tls_srv_records = []
tls_srv_disabled = True
if starttls_srv_disabled and (tls_srv_disabled or tls_srv_records is None):
raise ValueError(
"XMPP not enabled on domain {!r}".format(domain),
)
if starttls_srv_records is None and tls_srv_records is None:
# no SRV records published, fall back
logger.debug(
"no SRV records found for %s, falling back",
domain,
)
return [
(domain, 5222, connector.STARTTLSConnector()),
]
starttls_srv_records = starttls_srv_records or []
tls_srv_records = tls_srv_records or []
srv_records = [
(prio, weight, (host, port, connector.STARTTLSConnector()))
for prio, weight, (host, port) in starttls_srv_records
]
srv_records.extend(
(prio, weight, (host, port, connector.XMPPOverTLSConnector()))
for prio, weight, (host, port) in tls_srv_records
)
options = list(
network.group_and_order_srv_records(srv_records)
)
logger.debug(
"options for %s: %r",
domain,
options,
)
return options
@asyncio.coroutine
def _try_options(options, exceptions,
jid, metadata, negotiation_timeout, loop, logger):
"""
Helper function for :func:`connect_xmlstream`.
"""
for host, port, conn in options:
logger.debug(
"domain %s: trying to connect to %r:%s using %r",
jid.domain, host, port, conn
)
try:
transport, xmlstream, features = yield from conn.connect(
loop,
metadata,
jid.domain,
host,
port,
negotiation_timeout,
)
except OSError as exc:
logger.warning(
"connection failed: %s", exc
)
exceptions.append(exc)
continue
logger.debug(
"domain %s: connection succeeded using %r",
jid.domain,
conn,
)
try:
features = yield from security_layer.negotiate_sasl(
transport,
xmlstream,
metadata.sasl_providers,
negotiation_timeout,
jid,
features,
)
except errors.SASLUnavailable as exc:
protocol.send_stream_error_and_close(
xmlstream,
condition=(namespaces.streams, "policy-violation"),
text=str(exc),
)
exceptions.append(exc)
continue
except Exception as exc:
protocol.send_stream_error_and_close(
xmlstream,
condition=(namespaces.streams, "undefined-condition"),
text=str(exc),
)
raise
return transport, xmlstream, features
return None
@asyncio.coroutine
[docs]def connect_xmlstream(
jid,
metadata,
negotiation_timeout=60.,
override_peer=[],
loop=None,
logger=logger):
"""
Prepare and connect a :class:`aioxmpp.protocol.XMLStream` to a server
responsible for the given `jid` and authenticate against that server using
the SASL mechansims described in `metadata`.
The part of the `metadata` (which must be a
:class:`.security_layer.SecurityLayer` object) specifying the use of TLS is
applied. If the security layer does not mandate TLS, the resulting XML
stream may not be using TLS. TLS is used whenever possible.
`override_peer` may be a list of triples consisting of ``(host, port,
connector)``, where `connector` is a
:class:`aioxmpp.connector.BaseConnector` instance. The options in the list
are tried first (in the order given), and only if all of them fail,
automatic discovery of connection options is performed.
`loop` may be a :class:`asyncio.BaseEventLoop` to use. Defaults to the
current event loop.
If `domain` announces that XMPP is not supported at all,
:class:`ValueError` is raised. If no options are returned from
:func:`discover_connectors` and `override_peer` is empty,
:class:`ValueError` is raised, too.
If all connection attempts fail, :class:`aioxmpp.errors.MultiOSError` is
raised. The error contains one exception for each of the options discovered
as well as the elements from `override_peer` in the order they were tried.
.. note::
Even though it is a :class:`aioxmpp.errors.MultiOSError`, it may also
contain instances of :class:`aioxmpp.errors.TLSUnavailable` or
:class:`aioxmpp.errors.TLSFailed`.
A TLS problem is treated like any other connection problem and the other
connection options are considered.
Return a triple ``(transport, xmlstream, features)``. `transport`
the underlying :class:`asyncio.Transport` which is used for the `xmlstream`
:class:`~.protocol.XMLStream` instance. `features` is the
:class:`aioxmpp.nonza.StreamFeatures` instance describing the features of
the stream.
.. versionadded:: 0.6
"""
loop = asyncio.get_event_loop() if loop is None else loop
domain = jid.domain.encode("idna")
options = list(override_peer)
exceptions = []
result = yield from _try_options(
options,
exceptions,
jid, metadata, negotiation_timeout, loop, logger,
)
if result is not None:
return result
options = list((yield from discover_connectors(
domain,
loop=loop,
logger=logger,
)))
result = yield from _try_options(
options,
exceptions,
jid, metadata, negotiation_timeout, loop, logger,
)
if result is not None:
return result
if not options and not override_peer:
raise ValueError("no options to connect to XMPP domain {!r}".format(
jid.domain
))
raise errors.MultiOSError(
"failed to connect to XMPP domain {!r}".format(jid.domain),
exceptions
)
[docs]class AbstractClient:
"""
The :class:`AbstractClient` is a base class for implementing XMPP client
classes. These classes deal with managing the
:class:`~aioxmpp.stream.StanzaStream` and the underlying
:class:`~aioxmpp.protocol.XMLStream` instances. The abstract client
provides functionality for connecting the xmlstream as well as signals
which indicate changes in the stream state.
The `jid` must be a :class:`~aioxmpp.structs.JID` for which to connect. The
`security_layer` is best created using the
:func:`~aioxmpp.security_layer.security_layer` function and must provide
authentication for the given `jid`.
The `negotiation_timeout` argument controls the :attr:`negotiation_timeout`
attribute.
If `loop` is given, it must be a :class:`asyncio.BaseEventLoop`
instance. If it is not given, the current event loop is used.
`override_peer` is used to initialise the :attr:`override_peer` attribute.
As a glue between the stanza stream and the XML stream, it also knows about
stream management and performs stream management negotiation. It is
specialized on client operations, which implies that it will try to keep
the stream alive as long as wished by the client.
In general, there are no fatal errors (aside from stream negotiation
problems) which stop a :class:`AbstractClient` from working. It makes use
of stream management as far as possible and abstracts away the gritty low
level details. In general, it is sufficient to observe the
:attr:`on_stream_established` and :attr:`on_stream_destroyed` events, which
notify a user about when a stream becomes available and when it becomes
unavailable.
If authentication fails (or another stream negotiation error occurs), the
client fails and :attr:`on_failure` is fired. :attr:`running` becomes false
and the client needs to be re-started manually by calling :meth:`start`.
.. versionchanged:: 0.4
Since 0.4, support for legacy XMPP sessions has been implemented. Mainly
for compatiblity with ejabberd.
Controlling the client:
.. automethod:: start
.. automethod:: stop
.. autoattribute:: running
.. attribute:: negotiation_timeout = timedelta(seconds=60)
The timeout applied to the connection process and the individual steps
of negotiating the stream. See the `negotiation_timeout` argument to
:func:`connect_xmlstream`.
.. attribute:: override_peer
A sequence of triples ``(host, port, connector)``, where `host` must be
a host name or IP as string, `port` must be a port number and
`connector` must be a :class:`aioxmpp.connector.BaseConnctor` instance.
These connection options are passed to :meth:`connect_xmlstream` and
thus take precedence over the options discovered using
:meth:`discover_connectors`.
.. note::
If Stream Management is used and the peer server provided a location
to connect to on resumption, that location is preferred even over the
options set here.
.. versionadded:: 0.6
Connection information:
.. autoattribute:: established
.. autoattribute:: local_jid
.. attribute:: stream
The :class:`~aioxmpp.stream.StanzaStream` instance used by the node.
.. attribute:: stream_features
An instance of :class:`~aioxmpp.nonza.StreamFeatures`. This is the
most-recently received stream features information (the one received
right before resource binding).
While no stream has been established yet, this is :data:`None`. During
transparent re-negotiation, that information may be obsolete. However,
when :attr:`before_stream_established` fires, the information is
up-to-date.
Exponential backoff on interruptions:
.. attribute:: backoff_start
When an underlying XML stream fails due to connectivity issues (generic
:class:`OSError` raised), exponential backoff takes place before
attempting to reconnect.
The initial time to wait before reconnecting is described by
:attr:`backoff_start`.
.. attribute:: backoff_factor
Each subsequent time a connection fails, the previous backoff time is
multiplied with :attr:`backoff_factor`.
.. attribute:: backoff_cap
The backoff time is capped to :attr:`backoff_cap`, to avoid having
unrealistically high values.
Signals:
.. signal:: on_failure(err)
This signal is fired when the client fails and stops.
.. syncsignal:: before_stream_established()
This coroutine signal is executed right before
:meth:`on_stream_established` fires.
.. signal:: on_stopped()
Fires when the client stops gracefully. This is the counterpart to
:meth:`on_failure`.
.. signal:: on_stream_established()
When the stream is established and resource binding took place, this
event is fired. It means that the stream can now be used for XMPP
interactions.
.. signal:: on_stream_destroyed()
This is called whenever a stream is destroyed. The conditions for this
are the same as for
:attr:`aioxmpp.stream.StanzaStream.on_stream_destroyed`.
This event can be used to know when to discard all state about the XMPP
connection, such as roster information.
Services:
.. automethod:: summon
Miscellaneous:
.. attribute:: logger
The :class:`logging.Logger` instance which is used by the
:class:`AbstractClient`. This is the `logger` passed to the constructor
or a logger derived from the fully qualified name of the class.
.. versionadded:: 0.6
The :attr:`logger` attribute was added.
"""
on_failure = callbacks.Signal()
on_stopped = callbacks.Signal()
on_stream_destroyed = callbacks.Signal()
on_stream_established = callbacks.Signal()
before_stream_established = callbacks.SyncSignal()
def __init__(self,
local_jid,
security_layer,
negotiation_timeout=timedelta(seconds=60),
override_peer=[],
loop=None,
logger=None):
super().__init__()
self._local_jid = local_jid
self._loop = loop or asyncio.get_event_loop()
self._main_task = None
self._security_layer = security_layer
self._failure_future = asyncio.Future()
self.logger = (logger or
logging.getLogger(".".join([
type(self).__module__,
type(self).__qualname__,
])))
self._backoff_time = None
self._established = False
self._services = {}
self.stream_features = None
self.negotiation_timeout = negotiation_timeout
self.backoff_start = timedelta(seconds=1)
self.backoff_factor = 1.2
self.backoff_cap = timedelta(seconds=60)
self.override_peer = list(override_peer)
self.on_stopped.logger = self.logger.getChild("on_stopped")
self.on_failure.logger = self.logger.getChild("on_failure")
self.on_stream_established.logger = \
self.logger.getChild("on_stream_established")
self.on_stream_destroyed.logger = \
self.logger.getChild("on_stream_destroyed")
self.stream = stream.StanzaStream(local_jid.bare())
def _stream_failure(self, exc):
if self._failure_future.done():
self.logger.warning(
"something is odd: failure future is already done ..."
)
return
self._failure_future.set_result(exc)
self._failure_future = asyncio.Future()
def _stream_destroyed(self):
if self._established:
self._established = False
self.on_stream_destroyed()
def _on_bind_done(self, task):
try:
task.result()
except asyncio.CancelledError:
pass
except Exception as err:
self.logger.error("resource binding failed: %r", err)
self._main_task.cancel()
self.on_failure(err)
def _on_main_done(self, task):
try:
task.result()
except asyncio.CancelledError:
# task terminated normally
self.on_stopped()
except Exception as err:
self.logger.exception("main failed")
self.on_failure(err)
@asyncio.coroutine
def _try_resume_stream_management(self, xmlstream, features):
try:
yield from self.stream.resume_sm(xmlstream)
except errors.StreamNegotiationFailure as exc:
self.logger.warn("failed to resume stream (%s)",
exc)
return False
return True
@asyncio.coroutine
def _negotiate_legacy_session(self):
self.logger.debug(
"remote server announces support for legacy sessions"
)
yield from self.stream.send_iq_and_wait_for_reply(
stanza.IQ(type_="set",
payload=rfc3921.Session())
)
self.logger.debug(
"legacy session negotiated (upgrade your server!)"
)
@asyncio.coroutine
def _negotiate_stream(self, xmlstream, features):
server_can_do_sm = True
try:
features[nonza.StreamManagementFeature]
except KeyError:
if self.stream.sm_enabled:
self.logger.warn("server isn’t advertising SM anymore")
self.stream.stop_sm()
server_can_do_sm = False
self.logger.debug("negotiating stream (server_can_do_sm=%s)",
server_can_do_sm)
if self.stream.sm_enabled:
resumed = yield from self._try_resume_stream_management(
xmlstream, features)
if resumed:
return features, resumed
else:
resumed = False
self.stream_features = features
self.stream.start(xmlstream)
if not resumed:
self.logger.debug("binding to resource")
yield from self._bind()
if server_can_do_sm:
self.logger.debug("attempting to start stream management")
try:
yield from self.stream.start_sm()
except errors.StreamNegotiationFailure:
self.logger.debug("stream management failed to start")
self.logger.debug("stream management started")
try:
features[rfc3921.SessionFeature]
except KeyError:
pass # yay
else:
yield from self._negotiate_legacy_session()
self._established = True
yield from self.before_stream_established()
self.on_stream_established()
return features, resumed
@asyncio.coroutine
def _bind(self):
iq = stanza.IQ(type_="set")
iq.payload = rfc6120.Bind(resource=self._local_jid.resource)
try:
result = yield from self.stream.send_iq_and_wait_for_reply(iq)
except errors.XMPPError as exc:
raise errors.StreamNegotiationFailure(
"Resource binding failed: {}".format(exc)
)
self._local_jid = result.jid
self.logger.info("bound to jid: %s", self._local_jid)
@asyncio.coroutine
def _main_impl(self):
failure_future = self._failure_future
override_peer = []
if self.stream.sm_enabled:
sm_location = self.stream.sm_location
if sm_location:
override_peer.append((
str(sm_location[0]),
sm_location[1],
connector.STARTTLSConnector(),
))
override_peer += self.override_peer
tls_transport, xmlstream, features = \
yield from connect_xmlstream(
self._local_jid,
self._security_layer,
negotiation_timeout=self.negotiation_timeout.total_seconds(),
override_peer=override_peer,
loop=self._loop,
logger=self.logger)
try:
features, sm_resumed = yield from self._negotiate_stream(
xmlstream,
features)
self._backoff_time = None
exc = yield from failure_future
self.logger.error("stream failed: %s", exc)
raise exc
except asyncio.CancelledError:
self.logger.info("client shutting down (on request)")
# cancelled, this means a clean shutdown is requested
yield from self.stream.close()
raise
finally:
self.logger.info("stopping stream")
self.stream.stop()
@asyncio.coroutine
def _main(self):
with contextlib.ExitStack() as stack:
stack.enter_context(
self.stream.on_failure.context_connect(self._stream_failure)
)
stack.enter_context(
self.stream.on_stream_destroyed.context_connect(
self._stream_destroyed)
)
while True:
self._failure_future = asyncio.Future()
try:
yield from self._main_impl()
except errors.StreamError as err:
if err.condition == (namespaces.streams, "conflict"):
self.logger.debug("conflict!")
raise
except (errors.StreamNegotiationFailure,
aiosasl.SASLError):
if self.stream.sm_enabled:
self.stream.stop_sm()
raise
except (OSError, dns.resolver.NoNameservers):
if self._backoff_time is None:
self._backoff_time = self.backoff_start.total_seconds()
yield from asyncio.sleep(self._backoff_time)
self._backoff_time *= self.backoff_factor
if self._backoff_time > self.backoff_cap.total_seconds():
self._backoff_time = self.backoff_cap.total_seconds()
continue # retry
[docs] def start(self):
"""
Start the client. If it is already :attr:`running`,
:class:`RuntimeError` is raised.
While the client is running, it will try to keep an XMPP connection
open to the server associated with :attr:`local_jid`.
"""
if self.running:
raise RuntimeError("client already running")
self._main_task = asyncio.async(
self._main(),
loop=self._loop
)
self._main_task.add_done_callback(self._on_main_done)
[docs] def stop(self):
"""
Stop the client. This sends a signal to the clients main task which
makes it terminate.
It may take some cycles through the event loop to stop the client
task. To check whether the task has actually stopped, query
:attr:`running`.
"""
if not self.running:
return
self.logger.debug("stopping main task of %r", self, stack_info=True)
self._main_task.cancel()
# services
def _summon(self, class_):
try:
return self._services[class_]
except KeyError:
instance = class_(self, logger_base=self.logger)
self._services[class_] = instance
return instance
[docs] def summon(self, class_):
"""
Summon a :class:`~aioxmpp.service.Service` for the client.
If the `class_` has already been summoned for the client, it’s instance
is returned.
Otherwise, all requirements for the class are first summoned (if they
are not there already). Afterwards, the class itself is summoned and
the instance is returned.
"""
requirements = sorted(class_.ORDER_AFTER)
for req in requirements:
self._summon(req)
return self._summon(class_)
# properties
@property
[docs] def local_jid(self):
"""
The :class:`~aioxmpp.structs.JID` the client currently has. While the
client is disconnected, only the bare JID part is authentic, as the
resource is ultimately determined by the server.
Writing this attribute is not allowed, as changing the JID introduces a
lot of issues with respect to reusability of the stream. Instanciate a
new :class:`AbstractClient` if you need to change the bare part of the
JID.
.. note::
Changing the resource between reconnects may be allowed later.
"""
return self._local_jid
@property
[docs] def running(self):
"""
true if the client is currently running, false otherwise.
"""
return self._main_task is not None and not self._main_task.done()
@property
[docs] def established(self):
"""
true if the stream is currently established (as defined in
:attr:`on_stream_established`) and false otherwise.
"""
return self._established
[docs]class PresenceManagedClient(AbstractClient):
"""
A presence managed XMPP client. The arguments are passed to the
:class:`AbstractClient` constructor.
While the start/stop interfaces of :class:`AbstractClient` are still
available, it is recommended to control the presence managed client solely
using the :attr:`presence` property.
The initial presence is set to `unavailable`, thus, the client will not
connect immediately.
.. autoattribute:: presence
.. automethod:: set_presence
.. automethod:: connected
Signals:
.. attribute:: on_presence_sent
The event is fired after :attr:`~AbstractClient.on_stream_established`
and after the current presence has been sent to the server as *initial
presence*.
"""
on_presence_sent = callbacks.Signal()
def __init__(self, jid, security_layer, **kwargs):
super().__init__(jid, security_layer, **kwargs)
self._presence = structs.PresenceState(), []
self.on_stream_established.connect(self._handle_stream_established)
def _resend_presence(self):
pres = stanza.Presence()
state, status = self._presence
state.apply_to_stanza(pres)
pres.status.update(status)
self.stream.enqueue_stanza(pres)
def _handle_stream_established(self):
if self._presence[0].available:
self._resend_presence()
self.on_presence_sent()
def _update_presence(self):
if self._presence[0].available:
if not self.running:
self.start()
elif self.established:
self._resend_presence()
else:
if self.running:
self.stop()
@property
def presence(self):
"""
Control or query the current presence state (see
:class:`~.structs.PresenceState`) of the client. Note that when
reading, the property only returns the "set" value, not the actual
value known to the server (and others). This may differ if the
connection is still being established.
.. seealso::
Setting the presence state using :attr:`presence` clears the
`status` of the presence. To set the status and state at once,
use :meth:`set_presence`.
Upon setting this attribute, the :class:`PresenceManagedClient` will do
whatever neccessary to achieve the given presence. If the presence is
an `available` presence, the client will attempt to connect to the
server. If the presence is `unavailable` and the client is currently
connected, it will disconnect.
Instead of setting the presence to unavailable, :meth:`stop` can also
be called. The :attr:`presence` attribute is *not* affected by calls to
:meth:`start` or :meth:`stop`.
"""
return self._presence[0]
@presence.setter
[docs] def presence(self, value):
self._presence = value, []
self._update_presence()
[docs] def set_presence(self, state, status):
"""
Set the presence `state` and `status` on the client. This has the same
effects as writing `state` to :attr:`presence`, but the status of the
presence is also set at the same time.
`status` must be either a string or an iterable containing
:class:`.stanza.Status` objects. The :class:`.stanza.Status` instances
are saved and added to the presence stanza when it is time to send it.
The `status` is the text shown alongside the `state` (indicating
availability such as *away*, *do not disturb* and *free to chat*).
"""
if isinstance(status, str):
status = {None: status}
else:
status = dict(status)
self._presence = state, status
self._update_presence()
[docs] def connected(self, **kwargs):
"""
Return an asynchronous context manager (:pep:`492`). When it is
entered, the presence is changed to available. The context manager
waits until the stream is established, and then the context is entered.
Upon leaving the context manager, the presence is changed to
unavailable. The context manager waits until the stream is closed
fully.
The keyword arguments are passed to the :class:`UseConnected` context
manager constructor.
.. seealso::
:class:`UseConnected` is the context manager returned here.
.. versionadded:: 0.6
"""
return UseConnected(self, **kwargs)
[docs]class UseConnected:
"""
Control the given :class:`PresenceManagedClient` `client` as asynchronous
context manager (:pep:`492`). :class:`UseConnected` is an asynchronous
context manager. When the context manager is entered, the `client` is set
to an available presence (if the stream is not already established) and the
context manager waits for a connection to establish. If a fatal error
occurs while the stream is being established, it is re-raised.
When the context manager is left, the stream is shut down cleanly and the
context manager waits for the stream to shut down. Any exceptions occuring
in the context are not swallowed.
`timeout` is used to initialise the :attr:`timeout` attribute. `presence`
is used to initialise the :attr:`presence` attribute and defaults to a
simple available presence. `presence` must be an *available* presence.
.. versionadded:: 0.6
The following attributes control the behaviour of the context manager:
.. attribute:: timeout
Either :data:`None` or a :class:`datetime.timedelta` instance. If it is
the latter and it takes longer than that time to establish the stream,
the process is aborted and :class:`TimeoutError` is raised.
.. autoattribute:: presence
"""
def __init__(self, client, *,
timeout=None,
presence=structs.PresenceState(True)):
super().__init__()
self._client = client
self.timeout = timeout
self.presence = presence
@property
def presence(self):
"""
This is the presence which is sent when connecting. This may be an
unavailable presence.
"""
return self._presence
@presence.setter
[docs] def presence(self, value):
self._presence = value
@asyncio.coroutine
def __aenter__(self):
if self._client.established:
return self._client.stream
connected_future = asyncio.Future()
self._client.presence = self.presence
if not self._client.running:
self._client.start()
self._client.on_presence_sent.connect(
connected_future,
self._client.on_presence_sent.AUTO_FUTURE
)
self._client.on_failure.connect(
connected_future,
self._client.on_failure.AUTO_FUTURE
)
if self.timeout is not None:
try:
yield from asyncio.wait_for(
connected_future,
timeout=self.timeout.total_seconds()
)
except asyncio.TimeoutError:
self._client.presence = structs.PresenceState(False)
self._client.stop()
raise TimeoutError()
else:
yield from connected_future
return self._client.stream
@asyncio.coroutine
def __aexit__(self, exc_type, exc_value, exc_traceback):
self._client.presence = structs.PresenceState(False)
if not self._client.established:
return
disconnected_future = asyncio.Future()
self._client.on_stopped.connect(
disconnected_future,
self._client.on_stopped.AUTO_FUTURE
)
self._client.on_failure.connect(
disconnected_future,
self._client.on_failure.AUTO_FUTURE
)
try:
yield from disconnected_future
except:
if exc_type is None:
raise