########################################################################
# File name: callbacks.py
# This file is part of: aioxmpp
#
# LICENSE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
#
########################################################################
"""
:mod:`~aioxmpp.callbacks` -- Synchronous and asynchronous callbacks
###################################################################
This module provides facilities for objects to provide signals to which other
objects can connect.
Descriptor vs. ad-hoc
=====================
Descriptors can be used as class attributes and will create ad-hoc signals
dynamically for each instance. They are the most commonly used:
.. code-block:: python
class Emitter:
on_event = callbacks.Signal()
def handler():
pass
emitter1 = Emitter()
emitter2 = Emitter()
emitter1.on_event.connect(handler)
emitter1.on_event() # calls `handler`
emitter2.on_event() # does not call `handler`
# the actual signals are distinct
assert emitter1.on_event is not emitter2.on_event
Ad-hoc signals are useful for testing and are the type of which the actual
fields are.
Signal overview
===============
.. autosummary::
Signal
SyncSignal
AdHocSignal
SyncAdHocSignal
Utilities
---------
.. autofunction:: first_signal
Signal descriptors
------------------
These descriptors can be used on classes to have attributes which are signals:
.. autoclass:: Signal
.. autoclass:: SyncSignal
Signal implementations (ad-hoc signals)
---------------------------------------
Whenever accessing an attribute using the :class:`Signal` or
:class:`SyncSignal` descriptors, an object of one of the following classes is
returned. This is where the behaviour of the signals is specified.
.. autoclass:: AdHocSignal
.. autoclass:: SyncAdHocSignal
Filters
=======
.. autoclass:: Filter
"""
import abc
import asyncio
import collections
import contextlib
import functools
import logging
import types
import weakref
logger = logging.getLogger(__name__)
def log_spawned(logger, fut):
try:
result = fut.result()
except asyncio.CancelledError:
logger.debug("spawned task was cancelled")
except: # NOQA
logger.warning("spawned task raised exception", exc_info=True)
else:
if result is not None:
logger.info("value returned by spawned task was ignored: %r",
result)
class TagListener:
def __init__(self, ondata, onerror=None):
self._ondata = ondata
self._onerror = onerror
def data(self, data):
return self._ondata(data)
def error(self, exc):
if self._onerror is not None:
return self._onerror(exc)
def is_valid(self):
return True
class AsyncTagListener(TagListener):
def __init__(self, ondata, onerror=None, *, loop=None):
super().__init__(ondata, onerror)
self._loop = loop or asyncio.get_event_loop()
def data(self, data):
self._loop.call_soon(self._ondata, data)
def error(self, exc):
if self._onerror is not None:
self._loop.call_soon(self._onerror, exc)
class OneshotTagListener(TagListener):
def __init__(self, ondata, onerror=None, **kwargs):
super().__init__(ondata, onerror=onerror, **kwargs)
self._cancelled = False
def data(self, data):
super().data(data)
return True
def error(self, exc):
super().error(exc)
return True
def cancel(self):
self._cancelled = True
def is_valid(self):
return not self._cancelled and super().is_valid()
class OneshotAsyncTagListener(OneshotTagListener, AsyncTagListener):
pass
class FutureListener:
def __init__(self, fut):
self.fut = fut
def data(self, data):
try:
self.fut.set_result(data)
except asyncio.InvalidStateError:
pass
return True
def error(self, exc):
try:
self.fut.set_exception(exc)
except asyncio.InvalidStateError:
pass
return True
def is_valid(self):
return not self.fut.done()
class TagDispatcher:
def __init__(self):
self._listeners = {}
def add_callback(self, tag, fn):
return self.add_listener(tag, TagListener(fn))
def add_callback_async(self, tag, fn, *, loop=None):
return self.add_listener(
tag,
AsyncTagListener(fn, loop=loop)
)
def add_future(self, tag, fut):
return self.add_listener(
tag,
FutureListener(fut)
)
def add_listener(self, tag, listener):
try:
existing = self._listeners[tag]
if not existing.is_valid():
raise KeyError()
except KeyError:
self._listeners[tag] = listener
else:
raise ValueError("only one listener is allowed per tag")
def unicast(self, tag, data):
cb = self._listeners[tag]
if not cb.is_valid():
del self._listeners[tag]
self._listeners[tag]
if cb.data(data):
del self._listeners[tag]
def unicast_error(self, tag, exc):
cb = self._listeners[tag]
if not cb.is_valid():
del self._listeners[tag]
self._listeners[tag]
if cb.error(exc):
del self._listeners[tag]
def remove_listener(self, tag):
del self._listeners[tag]
def broadcast_error(self, exc):
for tag, listener in list(self._listeners.items()):
if listener.is_valid() and listener.error(exc):
del self._listeners[tag]
def close_all(self, exc):
self.broadcast_error(exc)
self._listeners.clear()
class AbstractAdHocSignal:
def __init__(self):
super().__init__()
self._connections = collections.OrderedDict()
self.logger = logger
def _connect(self, wrapper):
token = object()
self._connections[token] = wrapper
return token
def disconnect(self, token):
"""
Disconnect the connection identified by `token`. This never raises,
even if an invalid `token` is passed.
"""
try:
del self._connections[token]
except KeyError:
pass
[docs]class AdHocSignal(AbstractAdHocSignal):
"""
An ad-hoc signal is a single emitter. This is where callables are connected
to, using the :meth:`connect` method of the :class:`AdHocSignal`.
.. automethod:: fire
.. automethod:: connect
.. automethod:: context_connect
.. automethod:: future
.. attribute:: logger
This may be a :class:`logging.Logger` instance to allow the signal to
log errors and debug events to a specific logger instead of the default
logger (``aioxmpp.callbacks``).
This attribute must not be :data:`None`, and it is initialised to the
default logger on creation of the :class:`AdHocSignal`.
The different ways callables can be connected to an ad-hoc signal are shown
below:
.. attribute:: STRONG
Connections using this mode keep a strong reference to the callable. The
callable is called directly, thus blocking the emission of the signal.
.. attribute:: WEAK
Connections using this mode keep a weak reference to the callable. The
callable is executed directly, thus blocking the emission of the signal.
If the weak reference is dead, it is automatically removed from the
signals connection list. If the callable is a bound method,
:class:`weakref.WeakMethod` is used automatically.
For both :attr:`STRONG` and :attr:`WEAK` holds: if the callable returns a
true value, it is disconnected from the signal.
.. classmethod:: ASYNC_WITH_LOOP(loop)
This mode requires an :mod:`asyncio` event loop as argument. When the
signal is emitted, the callable is not called directly. Instead, it is
enqueued for calling with the event loop using
:meth:`asyncio.BaseEventLoop.call_soon`. If :data:`None` is passed as
`loop`, the loop is obtained from :func:`asyncio.get_event_loop` at
connect time.
A strong reference is held to the callable.
Connections using this mode are never removed automatically from the
signals connection list. You have to use :meth:`disconnect` explicitly.
.. attribute:: AUTO_FUTURE
Instead of a callable, a :class:`asyncio.Future` must be passed when
using this mode.
This mode can only be used for signals which send at most one
argument. If no argument is sent, the :meth:`~asyncio.Future.set_result`
method is called with :data:`None`.
If one argument is sent and it is an instance of :class:`Exception`, it
is passed to :meth:`~asyncio.Future.set_exception`. Otherwise, if one
argument is sent, it is passed to
:meth:`~asyncio.Future.set_exception`.
In any case, the future is removed after the next emission of the
signal.
.. classmethod:: SPAWN_WITH_LOOP(loop)
This mode requires an :mod:`asyncio` event loop as argument and a
coroutine to be passed to :meth:`connect`. If :data:`None` is passed as
`loop`, the loop is obtained from :func:`asyncio.get_event_loop` at
connect time.
When the signal is emitted, the coroutine is spawned using
:func:`asyncio.ensure_future` in the given `loop`, with the arguments
passed to the signal.
A strong reference is held to the coroutine.
Connections using this mode are never removed automatically from the
signals connection list. You have to use :meth:`disconnect` explicitly.
If the spawned coroutine returns with an exception or a non-:data:`None`
return value, a message is logged, with the following log levels:
* Return with non-:data:`None` value: :data:`logging.INFO`
* Raises :class:`asyncio.CancelledError`: :data:`logging.DEBUG`
* Raises any other exception: :data:`logging.WARNING`
.. versionadded:: 0.6
.. automethod:: disconnect
"""
[docs] @classmethod
def STRONG(cls, f):
if not hasattr(f, "__call__"):
raise TypeError("must be callable, got {!r}".format(f))
return functools.partial(cls._strong_wrapper, f)
[docs] @classmethod
def ASYNC_WITH_LOOP(cls, loop):
if loop is None:
loop = asyncio.get_event_loop()
def create_wrapper(f):
if not hasattr(f, "__call__"):
raise TypeError("must be callable, got {!r}".format(f))
return functools.partial(cls._async_wrapper,
f,
loop)
return create_wrapper
[docs] @classmethod
def WEAK(cls, f):
if not hasattr(f, "__call__"):
raise TypeError("must be callable, got {!r}".format(f))
if isinstance(f, types.MethodType):
ref = weakref.WeakMethod(f)
else:
ref = weakref.ref(f)
return functools.partial(cls._weakref_wrapper, ref)
[docs] @classmethod
def AUTO_FUTURE(cls, f):
def future_wrapper(args, kwargs):
if kwargs:
raise TypeError("keyword arguments not supported")
if len(args) > 0:
try:
arg, = args
except ValueError:
raise TypeError("too many arguments") from None
else:
arg = None
if f.done():
return
if isinstance(arg, Exception):
f.set_exception(arg)
else:
f.set_result(arg)
return future_wrapper
[docs] @classmethod
def SPAWN_WITH_LOOP(cls, loop):
loop = asyncio.get_event_loop() if loop is None else loop
def spawn(f):
if not asyncio.iscoroutinefunction(f):
raise TypeError("must be coroutine, got {!r}".format(f))
def wrapper(args, kwargs):
task = asyncio.ensure_future(f(*args, **kwargs), loop=loop)
task.add_done_callback(
functools.partial(
log_spawned,
logger,
)
)
return True
return wrapper
return spawn
@staticmethod
def _async_wrapper(f, loop, args, kwargs):
if kwargs:
functools.partial(f, *args, **kwargs)
loop.call_soon(f, *args)
return True
@staticmethod
def _weakref_wrapper(fref, args, kwargs):
f = fref()
if f is None:
return False
return not f(*args, **kwargs)
@staticmethod
def _strong_wrapper(f, args, kwargs):
return not f(*args, **kwargs)
[docs] def connect(self, f, mode=None):
"""
Connect an object `f` to the signal. The type the object needs to have
depends on `mode`, but usually it needs to be a callable.
:meth:`connect` returns an opaque token which can be used with
:meth:`disconnect` to disconnect the object from the signal.
The default value for `mode` is :attr:`STRONG`. Any decorator can be
used as argument for `mode` and it is applied to `f`. The result is
stored internally and is what will be called when the signal is being
emitted.
If the result of `mode` returns a false value during emission, the
connection is removed.
.. note::
The return values required by the callable returned by `mode` and
the one required by a callable passed to `f` using the predefined
modes are complementary!
A callable `f` needs to return true to be removed from the
connections, while a callable returned by the `mode` decorator needs
to return false.
Existing modes are listed below.
"""
mode = mode or self.STRONG
self.logger.debug("connecting %r with mode %r", f, mode)
return self._connect(mode(f))
[docs] def context_connect(self, f, mode=None):
"""
This returns a *context manager*. When entering the context, `f` is
connected to the :class:`AdHocSignal` using `mode`. When leaving the
context (no matter whether with or without exception), the connection
is disconnected.
.. seealso::
The returned object is an instance of
:class:`SignalConnectionContext`.
"""
return SignalConnectionContext(self, f, mode=mode)
[docs] def fire(self, *args, **kwargs):
"""
Emit the signal, calling all connected objects in-line with the given
arguments and in the order they were registered.
:class:`AdHocSignal` provides full isolation with respect to
exceptions. If a connected listener raises an exception, the other
listeners are executed as normal, but the raising listener is removed
from the signal. The exception is logged to :attr:`logger` and *not*
re-raised, so that the caller of the signal is also not affected.
Instead of calling :meth:`fire` explicitly, the ad-hoc signal object
itself can be called, too.
"""
for token, wrapper in list(self._connections.items()):
try:
keep = wrapper(args, kwargs)
except Exception:
self.logger.exception("listener attached to signal raised")
keep = False
if not keep:
del self._connections[token]
[docs] def future(self):
"""
Return a :class:`asyncio.Future` which has been :meth:`connect`\\ -ed
using :attr:`AUTO_FUTURE`.
The token returned by :meth:`connect` is not returned; to remove the
future from the signal, just cancel it.
"""
fut = asyncio.Future()
self.connect(fut, self.AUTO_FUTURE)
return fut
__call__ = fire
[docs]class SyncAdHocSignal(AbstractAdHocSignal):
"""
A synchronous ad-hoc signal is like :class:`AdHocSignal`, but for
coroutines instead of ordinary callables.
.. automethod:: connect
.. automethod:: context_connect
.. automethod:: fire
.. automethod:: disconnect
"""
[docs] def connect(self, coro):
"""
The coroutine `coro` is connected to the signal. The coroutine must
return a true value, unless it wants to be disconnected from the
signal.
.. note::
This is different from the return value convention with
:attr:`AdHocSignal.STRONG` and :attr:`AdHocSignal.WEAK`.
:meth:`connect` returns a token which can be used with
:meth:`disconnect` to disconnect the coroutine.
"""
self.logger.debug("connecting %r", coro)
return self._connect(coro)
[docs] def context_connect(self, coro):
"""
This returns a *context manager*. When entering the context, `coro` is
connected to the :class:`SyncAdHocSignal`. When leaving the context (no
matter whether with or without exception), the connection is
disconnected.
.. seealso::
The returned object is an instance of
:class:`SignalConnectionContext`.
"""
return SignalConnectionContext(self, coro)
[docs] @asyncio.coroutine
def fire(self, *args, **kwargs):
"""
Emit the signal, calling all coroutines in-line with the given
arguments and in the order they were registered.
This is obviously a coroutine.
Instead of calling :meth:`fire` explicitly, the ad-hoc signal object
itself can be called, too.
"""
for token, coro in list(self._connections.items()):
keep = yield from coro(*args, **kwargs)
if not keep:
del self._connections[token]
__call__ = fire
class SignalConnectionContext:
def __init__(self, signal, *args, **kwargs):
self._signal = signal
self._args = args
self._kwargs = kwargs
def __enter__(self):
try:
token = self._signal.connect(*self._args, **self._kwargs)
finally:
del self._args
del self._kwargs
self._token = token
return token
def __exit__(self, exc_type, exc_value, traceback):
self._signal.disconnect(self._token)
return False
class AbstractSignal(metaclass=abc.ABCMeta):
def __init__(self, *, doc=None):
super().__init__()
self.__doc__ = doc
self._instances = weakref.WeakKeyDictionary()
@abc.abstractclassmethod
def make_adhoc_signal(cls):
pass
def __get__(self, instance, owner):
if instance is None:
return self
try:
return self._instances[instance]
except KeyError:
new = self.make_adhoc_signal()
self._instances[instance] = new
return new
def __set__(self, instance, value):
raise AttributeError("cannot override Signal attribute")
def __delete__(self, instance):
raise AttributeError("cannot override Signal attribute")
[docs]class Signal(AbstractSignal):
"""
A descriptor which returns per-instance :class:`AdHocSignal` objects on
attribute access.
Example use:
.. code-block:: python
class Foo:
on_event = Signal()
f = Foo()
assert isinstance(f.on_event, AdHocSignal)
assert f.on_event is f.on_event
assert Foo().on_event is not f.on_event
"""
@classmethod
def make_adhoc_signal(cls):
return AdHocSignal()
[docs]class SyncSignal(AbstractSignal):
"""
A descriptor which returns per-instance :class:`SyncAdHocSignal` objects on
attribute access.
Example use:
.. code-block:: python
class Foo:
on_event = SyncSignal()
f = Foo()
assert isinstance(f.on_event, SyncAdHocSignal)
assert f.on_event is f.on_event
assert Foo().on_event is not f.on_event
"""
@classmethod
def make_adhoc_signal(cls):
return SyncAdHocSignal()
[docs]class Filter:
"""
A filter chain for arbitrary data.
This is used for example in :class:`~.stream.StanzaStream` to allow
services and applications to filter inbound and outbound stanzas.
Each function registered with the filter receives at least one argument.
This argument is the object which is to be filtered. The function must
return the object, a replacement or :data:`None`. If :data:`None` is
returned, the filter chain aborts and further functions are not called.
Otherwise, the next function is called with the result of the previous
function until the filter chain is complete.
Other arguments passed to :meth:`filter` are passed unmodified to each
function called; only the first argument is subject to filtering.
.. versionchanged:: 0.9
This class was formerly available at :class:`aioxmpp.stream.Filter`.
.. automethod:: register
.. automethod:: filter
.. automethod:: unregister
.. automethod:: context_register(func[, order])
"""
class Token:
def __str__(self):
return "<{}.{} 0x{:x}>".format(
type(self).__module__,
type(self).__qualname__,
id(self))
def __init__(self):
super().__init__()
self._filter_order = []
[docs] def register(self, func, order):
"""
Add a function to the filter chain.
:param func: A callable which is to be added to the filter chain.
:param order: An object indicating the ordering of the function
relative to the others.
:return: Token representing the registration.
Register the function `func` as a filter into the chain. `order` must
be a value which is used as a sorting key to order the functions
registered in the chain.
The type of `order` depends on the use of the filter, as does the
number of arguments and keyword arguments which `func` must accept.
This will generally be documented at the place where the
:class:`Filter` is used.
Functions with the same order are sorted in the order of their
addition, with the function which was added earliest first.
Remember that all values passed to `order` which are registered at the
same time in the same :class:`Filter` need to be totally orderable with
respect to each other.
The returned token can be used to :meth:`unregister` a filter.
"""
token = self.Token()
self._filter_order.append((order, token, func))
self._filter_order.sort(key=lambda x: x[0])
return token
[docs] def filter(self, obj, *args, **kwargs):
"""
Filter the given object through the filter chain.
:param obj: The object to filter
:param args: Additional arguments to pass to each filter function.
:param kwargs: Additional keyword arguments to pass to each filter
function.
:return: The filtered object or :data:`None`
See the documentation of :class:`Filter` on how filtering operates.
Returns the object returned by the last function in the filter chain or
:data:`None` if any function returned :data:`None`.
"""
for _, _, func in self._filter_order:
obj = func(obj, *args, **kwargs)
if obj is None:
return None
return obj
[docs] def unregister(self, token_to_remove):
"""
Unregister a filter function.
:param token_to_remove: The token as returned by :meth:`register`.
Unregister a function from the filter chain using the token returned by
:meth:`register`.
"""
for i, (_, token, _) in enumerate(self._filter_order):
if token == token_to_remove:
break
else:
raise ValueError("unregistered token: {!r}".format(
token_to_remove))
del self._filter_order[i]
[docs] @contextlib.contextmanager
def context_register(self, func, *args):
"""
:term:`Context manager <context manager>` which temporarily registers a
filter function.
:param func: The filter function to register.
:param order: The sorting key for the filter function.
:rtype: :term:`context manager`
:return: Context manager which temporarily registers the filter
function.
If :meth:`register` does not require `order` because it has been
overridden in a subclass, the `order` argument can be omitted here,
too.
.. versionadded:: 0.9
"""
token = self.register(func, *args)
try:
yield
finally:
self.unregister(token)
[docs]def first_signal(*signals):
"""
Connect to multiple signals and wait for the first to emit.
:param signals: Signals to connect to.
:type signals: :class:`AdHocSignal`
:return: An awaitable for the first signal to emit.
The awaitable returns the first argument passed to the signal. If the first
argument is an exception, the exception is re-raised from the awaitable.
A common use-case is a situation where a class exposes a "on_finished" type
signal and an "on_failure" type signal. :func:`first_signal` can be used
to combine those nicely::
# e.g. a aioxmpp.im.conversation.AbstractConversation
conversation = ...
await first_signal(
# emits without arguments when the conversation is successfully
# entered
conversation.on_enter,
# emits with an exception when entering the conversation fails
conversation.on_failure,
)
# await first_signal(...) will either raise an exception (failed) or
# return None (success)
.. warning::
Only works with signals which emit with zero or one argument. Signals
which emit with more than one argument or with keyword arguments are
silently ignored! (Thus, if only such signals are connected, the
future will never complete.)
(This is a side-effect of the implementation of
:meth:`AdHocSignal.AUTO_FUTURE`).
.. note::
Does not work with coroutine signals (:class:`SyncAdHocSignal`).
"""
fut = asyncio.Future()
for signal in signals:
signal.connect(fut, signal.AUTO_FUTURE)
return fut