Source code for aioxmpp.callbacks

"""
:mod:`~aioxmpp.callbacks` -- Synchronous and asynchronous callbacks
###################################################################

This module provides facilities for objects to provide signals to which other
objects can connect.

Descriptor vs. ad-hoc
=====================

Descriptors can be used as class attributes and will create ad-hoc signals
dynamically for each instance. They are the most commonly used:

.. code-block:: python

   class Emitter:
       on_event = callbacks.Signal()

   def handler():
       pass

   emitter1 = Emitter()
   emitter2 = Emitter()
   emitter1.on_event.connect(handler)

   emitter1.on_event()  # calls `handler`
   emitter2.on_event()  # does not call `handler`

   # the actual signals are distinct
   assert emitter1.on_event is not emitter2.on_event

Ad-hoc signals are useful for testing and are the type of which the actual
fields are.

Class overview
==============

.. autosummary::

   Signal
   SyncSignal
   AdHocSignal
   SyncAdHocSignal

Signal implementations (ad-hoc signals)
---------------------------------------

Whenever accessing an attribute using the :class:`Signal` or
:class:`SyncSignal` descriptors, an object of one of the following classes is
returned. This is where the behaviour of the signals is specified.

.. autoclass:: AdHocSignal

.. autoclass:: SyncAdHocSignal

Signal descriptors
------------------

These descriptors can be used on classes to have attributes which are signals:

.. autoclass:: Signal

.. autoclass:: SyncSignal

"""

import abc
import asyncio
import collections
import functools
import logging
import types
import weakref


logger = logging.getLogger(__name__)


class TagListener:
    def __init__(self, ondata, onerror=None):
        self._ondata = ondata
        self._onerror = onerror

    def data(self, data):
        return self._ondata(data)

    def error(self, exc):
        if self._onerror is not None:
            return self._onerror(exc)

    def is_valid(self):
        return True


class AsyncTagListener(TagListener):
    def __init__(self, ondata, onerror=None, *, loop=None):
        super().__init__(ondata, onerror)
        self._loop = loop or asyncio.get_event_loop()

    def data(self, data):
        self._loop.call_soon(self._ondata, data)

    def error(self, exc):
        if self._onerror is not None:
            self._loop.call_soon(self._onerror, exc)


class OneshotTagListener(TagListener):
    def data(self, data):
        super().data(data)
        return True

    def error(self, exc):
        super().error(exc)
        return True


class OneshotAsyncTagListener(OneshotTagListener, AsyncTagListener):
    pass


class FutureListener:
    def __init__(self, fut):
        self.fut = fut

    def data(self, data):
        try:
            self.fut.set_result(data)
        except asyncio.futures.InvalidStateError:
            pass
        return True

    def error(self, exc):
        try:
            self.fut.set_exception(exc)
        except asyncio.futures.InvalidStateError:
            pass
        return True

    def is_valid(self):
        return not self.fut.done()


class TagDispatcher:
    def __init__(self):
        self._listeners = {}

    def add_callback(self, tag, fn):
        return self.add_listener(tag, TagListener(fn))

    def add_callback_async(self, tag, fn, *, loop=None):
        return self.add_listener(
            tag,
            AsyncTagListener(fn, loop=loop)
        )

    def add_future(self, tag, fut):
        return self.add_listener(
            tag,
            FutureListener(fut)
        )

    def add_listener(self, tag, listener):
        try:
            existing = self._listeners[tag]
            if not existing.is_valid():
                raise KeyError()
        except KeyError:
            self._listeners[tag] = listener
        else:
            raise ValueError("only one listener is allowed per tag")

    def unicast(self, tag, data):
        cb = self._listeners[tag]
        if not cb.is_valid():
            del self._listeners[tag]
            self._listeners[tag]
        if cb.data(data):
            del self._listeners[tag]

    def unicast_error(self, tag, exc):
        cb = self._listeners[tag]
        if not cb.is_valid():
            del self._listeners[tag]
            self._listeners[tag]
        if cb.error(exc):
            del self._listeners[tag]

    def remove_listener(self, tag):
        del self._listeners[tag]

    def broadcast_error(self, exc):
        for tag, listener in list(self._listeners.items()):
            if listener.is_valid() and listener.error(exc):
                del self._listeners[tag]

    def close_all(self, exc):
        self.broadcast_error(exc)
        self._listeners.clear()


class AbstractAdHocSignal:
    def __init__(self):
        super().__init__()
        self._connections = collections.OrderedDict()
        self.logger = logger

    def _connect(self, wrapper):
        token = object()
        self._connections[token] = wrapper
        return token

    def disconnect(self, token):
        """
        Disconnect the connection identified by `token`. This never raises,
        even if an invalid `token` is passed.
        """
        try:
            del self._connections[token]
        except KeyError:
            pass


[docs]class AdHocSignal(AbstractAdHocSignal): """ An ad-hoc signal is a single emitter. This is where callables are connected to, using the :meth:`connect` method of the :class:`AdHocSignal`. .. automethod:: fire .. automethod:: connect .. automethod:: future .. attribute:: logger This may be a :class:`logging.Logger` instance to allow the signal to log errors and debug events to a specific logger instead of the default logger (``aioxmpp.callbacks``). This attribute must not be :data:`None`, and it is initialised to the default logger on creation of the :class:`AdHocSignal`. The different ways callables can be connected to an ad-hoc signal are shown below: .. attribute:: STRONG Connections using this mode keep a strong reference to the callable. The callable is called directly, thus blocking the emission of the signal. .. attribute:: WEAK Connections using this mode keep a weak reference to the callable. The callable is executed directly, thus blocking the emission of the signal. If the weak reference is dead, it is automatically removed from the signals connection list. If the callable is a bound method, :class:`weakref.WeakMethod` is used automatically. For both :attr:`STRONG` and :attr:`WEAK` holds: if the callable returns a true value, it is disconnected from the signal. .. classmethod:: ASYNC_WITH_LOOP(loop) This mode requires an :mod:`asyncio` event loop as argument. When the signal is emitted, the callable is not called directly. Instead, it is enqueued for calling with the event loop using :meth:`asyncio.BaseEventLoop.call_soon`. If :data:`None` is passed as `loop`, the loop is obtained from :func:`asyncio.get_event_loop` at connect time. A strong reference is held to the callable. Connections using this mode are never removed automatically from the signals connection list. You have to use :meth:`disconnect` explicitly. .. attribute:: AUTO_FUTURE Instead of a callable, a :class:`asyncio.Future` must be passed when using this mode. This mode can only be used for signals which send at most one argument. If no argument is sent, the :meth:`~asyncio.Future.set_result` method is called with :data:`None`. If one argument is sent and it is an instance of :class:`Exception`, it is passed to :meth:`~asyncio.Future.set_exception`. Otherwise, if one argument is sent, it is passed to :meth:`~asyncio.Future.set_exception`. In any case, the future is removed after the next emission of the signal. .. classmethod:: SPAWN_WITH_LOOP(loop) This mode requires an :mod:`asyncio` event loop as argument and a coroutine to be passed to :meth:`connect`. If :data:`None` is passed as `loop`, the loop is obtained from :func:`asyncio.get_event_loop` at connect time. When the signal is emitted, the coroutine is spawned using :func:`asyncio.async` in the given `loop`, with the arguments passed to the signal. A strong reference is held to the coroutine. Connections using this mode are never removed automatically from the signals connection list. You have to use :meth:`disconnect` explicitly. .. versionadded:: 0.6 .. automethod:: disconnect """ @classmethod
[docs] def STRONG(cls, f): if not hasattr(f, "__call__"): raise TypeError("must be callable, got {!r}".format(f)) return functools.partial(cls._strong_wrapper, f)
@classmethod
[docs] def ASYNC_WITH_LOOP(cls, loop): if loop is None: loop = asyncio.get_event_loop() def create_wrapper(f): if not hasattr(f, "__call__"): raise TypeError("must be callable, got {!r}".format(f)) return functools.partial(cls._async_wrapper, f, loop) return create_wrapper
@classmethod
[docs] def WEAK(cls, f): if not hasattr(f, "__call__"): raise TypeError("must be callable, got {!r}".format(f)) if isinstance(f, types.MethodType): ref = weakref.WeakMethod(f) else: ref = weakref.ref(f) return functools.partial(cls._weakref_wrapper, ref)
@classmethod
[docs] def AUTO_FUTURE(cls, f): def future_wrapper(args, kwargs): if kwargs: raise TypeError("keyword arguments not supported") if len(args) > 0: try: arg, = args except ValueError: raise TypeError("too many arguments") from None else: arg = None if f.done(): return if isinstance(arg, Exception): f.set_exception(arg) else: f.set_result(arg) return future_wrapper
@classmethod
[docs] def SPAWN_WITH_LOOP(cls, loop): loop = asyncio.get_event_loop() if loop is None else loop def spawn(f): if not asyncio.iscoroutinefunction(f): raise TypeError("must be coroutine, got {!r}".format(f)) def wrapper(args, kwargs): asyncio.async(f(*args, **kwargs), loop=loop) return True return wrapper return spawn
@staticmethod def _async_wrapper(f, loop, args, kwargs): if kwargs: functools.partial(f, *args, **kwargs) loop.call_soon(f, *args) return True @staticmethod def _weakref_wrapper(fref, args, kwargs): f = fref() if f is None: return False return not f(*args, **kwargs) @staticmethod def _strong_wrapper(f, args, kwargs): return not f(*args, **kwargs)
[docs] def connect(self, f, mode=None): """ Connect an object `f` to the signal. The type the object needs to have depends on `mode`, but usually it needs to be a callable. :meth:`connect` returns an opaque token which can be used with :meth:`disconnect` to disconnect the object from the signal. The default value for `mode` is :attr:`STRONG`. Any decorator can be used as argument for `mode` and it is applied to `f`. The result is stored internally and is what will be called when the signal is being emitted. If the result of `mode` returns a false value during emission, the connection is removed. .. note:: The return values required by the callable returned by `mode` and the one required by a callable passed to `f` using the predefined modes are complementary! A callable `f` needs to return true to be removed from the connections, while a callable returned by the `mode` decorator needs to return false. Existing modes are listed below. """ mode = mode or self.STRONG self.logger.debug("connecting %r with mode %r", f, mode) return self._connect(mode(f))
def context_connect(self, f, mode=None): """ This returns a *context manager*. When entering the context, `f` is connected to the :class:`AdHocSignal` using `mode`. When leaving the context (no matter whether with or without exception), the connection is disconnected. .. seealso:: The returned object is an instance of :class:`SignalConnectionContext`. """ return SignalConnectionContext(self, f, mode=mode)
[docs] def fire(self, *args, **kwargs): """ Emit the signal, calling all connected objects in-line with the given arguments and in the order they were registered. :class:`AdHocSignal` provides full isolation with respect to exceptions. If a connected listener raises an exception, the other listeners are executed as normal, but the raising listener is removed from the signal. The exception is logged to :attr:`logger` and *not* re-raised, so that the caller of the signal is also not affected. Instead of calling :meth:`fire` explicitly, the ad-hoc signal object itself can be called, too. """ for token, wrapper in list(self._connections.items()): try: keep = wrapper(args, kwargs) except Exception: self.logger.exception("listener attached to signal raised") keep = False if not keep: del self._connections[token]
[docs] def future(self): """ Return a :class:`asyncio.Future` which has been :meth:`connect`\ -ed using :attr:`AUTO_FUTURE`. The token returned by :meth:`connect` is not returned; to remove the future from the signal, just cancel it. """ fut = asyncio.Future() self.connect(fut, self.WEAK) return fut
__call__ = fire
[docs]class SyncAdHocSignal(AbstractAdHocSignal): """ A synchronous ad-hoc signal is like :class:`AdHocSignal`, but for coroutines instead of ordinary callables. .. automethod:: connect .. automethod:: context_connect .. automethod:: fire .. automethod:: disconnect """
[docs] def connect(self, coro): """ The coroutine `coro` is connected to the signal. The coroutine must return a true value, unless it wants to be disconnected from the signal. .. note:: This is different from the return value convention with :attr:`AdHocSignal.STRONG` and :attr:`AdHocSignal.WEAK`. :meth:`connect` returns a token which can be used with :meth:`disconnect` to disconnect the coroutine. """ return self._connect(coro)
[docs] def context_connect(self, coro): """ This returns a *context manager*. When entering the context, `coro` is connected to the :class:`SyncAdHocSignal`. When leaving the context (no matter whether with or without exception), the connection is disconnected. .. seealso:: The returned object is an instance of :class:`SignalConnectionContext`. """ return SignalConnectionContext(self, coro)
@asyncio.coroutine
[docs] def fire(self, *args, **kwargs): """ Emit the signal, calling all coroutines in-line with the given arguments and in the order they were registered. This is obviously a coroutine. Instead of calling :meth:`fire` explicitly, the ad-hoc signal object itself can be called, too. """ for token, coro in list(self._connections.items()): keep = yield from coro(*args, **kwargs) if not keep: del self._connections[token]
__call__ = fire
class SignalConnectionContext: def __init__(self, signal, *args, **kwargs): self._signal = signal self._args = args self._kwargs = kwargs def __enter__(self): try: token = self._signal.connect(*self._args, **self._kwargs) finally: del self._args del self._kwargs self._token = token return token def __exit__(self, exc_type, exc_value, traceback): self._signal.disconnect(self._token) return False class AbstractSignal(metaclass=abc.ABCMeta): def __init__(self, *, doc=None): super().__init__() self.__doc__ = doc self._instances = weakref.WeakKeyDictionary() @abc.abstractclassmethod def make_adhoc_signal(cls): pass def __get__(self, instance, owner): if instance is None: return self try: return self._instances[instance] except KeyError: new = self.make_adhoc_signal() self._instances[instance] = new return new def __set__(self, instance, value): raise AttributeError("cannot override Signal attribute") def __delete__(self, instance): raise AttributeError("cannot override Signal attribute")
[docs]class Signal(AbstractSignal): """ A descriptor which returns per-instance :class:`AdHocSignal` objects on attribute access. Example use: .. code-block:: python class Foo: on_event = Signal() f = Foo() assert isinstance(f.on_event, AdHocSignal) assert f.on_event is f.on_event assert Foo().on_event is not f.on_event """ @classmethod def make_adhoc_signal(cls): return AdHocSignal()
[docs]class SyncSignal(AbstractSignal): """ A descriptor which returns per-instance :class:`SyncAdHocSignal` objects on attribute access. Example use: .. code-block:: python class Foo: on_event = SyncSignal() f = Foo() assert isinstance(f.on_event, SyncAdHocSignal) assert f.on_event is f.on_event assert Foo().on_event is not f.on_event """ @classmethod def make_adhoc_signal(cls): return SyncAdHocSignal()