Source code for aioxmpp.xso.query

import abc
import copy
import itertools
import inspect
import operator


class _SoftExprMixin:
    """
    This mixin is used for metaclasses and descriptors.

    It defines the operators ``/`` and ``[]``, which are rarely used for either
    classes or descriptors.

    .. seealso::

       :class:`_ExprMixin`
          which inherits from this class and defines more operators, some of
          which would be unsafe to implement on classes or descriptors, such as
          ``==``.

    """

    def __truediv__(self, other):
        if isinstance(other, PreExpr):
            return as_expr(other, lhs=self)
        elif isinstance(other, Expr):
            return as_expr(other, lhs=self)
        return NotImplemented

    def __getitem__(self, index):
        if isinstance(index, where):
            return ExprFilter(self, as_expr(index.expr))
        return Nth(self, as_expr(index))


class _ExprMixin(_SoftExprMixin):
    """
    This mixin defines operators which are only "safe" to overload in
    constrained situations. These operators often have meanings and may be
    implicitly used by the python language; thus, they are only defined on
    :class:`Expr` subclasses and some :class:`PreExpr` subclasses.

    The defined operators currently are:

    * Comparison: ``==``, ``<``, ``<=``, ``>=``, ``>``, ``!=``
    """

    def __eq__(self, other):
        return CmpOp(
            as_expr(self),
            as_expr(other),
            operator.eq,
        )

    def __ne__(self, other):
        return CmpOp(
            as_expr(self),
            as_expr(other),
            operator.ne,
        )

    def __lt__(self, other):
        return CmpOp(
            as_expr(self),
            as_expr(other),
            operator.lt,
        )

    def __gt__(self, other):
        return CmpOp(
            as_expr(self),
            as_expr(other),
            operator.gt,
        )

    def __ge__(self, other):
        return CmpOp(
            as_expr(self),
            as_expr(other),
            operator.ge,
        )

    def __le__(self, other):
        return CmpOp(
            as_expr(self),
            as_expr(other),
            operator.le,
        )


[docs]class EvaluationContext: """ The evaluation context holds contextual information for the evaluation of a query expression. Most notably, it provides the methods for acquiring and replacing the toplevel objects of classes: .. automethod:: get_toplevel_object() .. automethod:: set_toplevel_object() In addition, it provides shortcuts for evaluating expressions: .. automethod:: eval .. automethod:: eval_bool """ def __init__(self, *args, **kwargs): super().__init__() self._toplevels = {} def __copy__(self): result = type(self).__new__(type(self)) result._toplevels = dict(self._toplevels) return result
[docs] def get_toplevel_object(self, class_): """ Return the toplevel object for the given `class_`. Only exact matches are returned. """ return self._toplevels[class_]
[docs] def set_toplevel_object(self, instance, class_=None): """ Set the toplevel object to return from :meth:`get_toplevel_object` when asked for `class_` to `instance`. If `class_` is :data:`None`, the :func:`type` of the `instance` is used. """ if class_ is None: class_ = type(instance) self._toplevels[class_] = instance
[docs] def eval(self, expr): """ Evaluate the expression `expr` and return the result. The result of an expression is always an iterable. """ return expr.eval(self)
[docs] def eval_bool(self, expr): """ Evaluate the expression `expr` and return the truthness of its result. A result of an expression is said to be true if it contains at least one value. It has the same semantics as :func:`bool` on sequences.s """ result = expr.eval(self) iterator = iter(result) try: next(iterator) except StopIteration: return False else: return True finally: if hasattr(iterator, "close"): iterator.close()
class Expr(_ExprMixin, metaclass=abc.ABCMeta): """ Base class for things which are solely expressions and nothing else. """ @abc.abstractmethod def eval(self, ec): pass def eval_leaf(self, ec): result = self.eval(ec) if inspect.isgenerator(result): return list(result) return result def __repr__(self): return "<{}.{} {!r}>".format( type(self).__module__, type(self).__qualname__, self.__dict__, ) class ContextInstance(Expr): def __init__(self, class_, **kwargs): super().__init__(**kwargs) self.class_ = class_ def eval(self, ec): """ Retrieve the current toplevel instance of `class_` from the :class:`EvaluationContext`. ` """ try: return [ec.get_toplevel_object(self.class_)] except KeyError: return [] class GetDescriptor(Expr): """ Represents a descriptor bound to a class. As an expression, it represents the query for all values of the `descriptor` on an all instances of `class_` in the result set of `expr`. """ def __init__(self, expr, descriptor): super().__init__() self.expr = expr self.descriptor = descriptor def new_values(self): return [] def update_values(self, v, vnew): v.append(vnew) def eval(self, ec): vs = self.new_values() for instance in self.expr.eval(ec): try: vnew = self.descriptor.__get__(instance, type(instance)) except AttributeError: continue self.update_values( vs, vnew ) return vs class GetMappingDescriptor(GetDescriptor): def __init__(self, expr, descriptor, mapping_factory=dict, **kwargs): super().__init__(expr, descriptor, **kwargs) self.mapping_factory = mapping_factory def new_values(self): return self.mapping_factory() def update_values(self, v, vnew): v.update(vnew) class GetSequenceDescriptor(GetDescriptor): def __init__(self, expr, descriptor, sequence_factory=list, **kwargs): super().__init__(expr, descriptor, **kwargs) self.sequence_factory = sequence_factory def new_values(self): return self.sequence_factory() def update_values(self, v, vnew): v.extend(vnew) class GetInstances(Expr): def __init__(self, expr, class_): super().__init__() self.expr = expr self.class_ = class_ def eval(self, ec): for obj in self.expr.eval(ec): if isinstance(obj, self.class_): yield obj class Nth(Expr): def __init__(self, expr, nth_expr): super().__init__() self.expr = expr self.nth_expr = nth_expr def eval(self, ec): n, = self.nth_expr.eval(ec) iterable = self.expr.eval(ec) if isinstance(n, slice): return itertools.islice( iterable, n.start, n.stop, n.step, ) return itertools.islice( self.expr.eval(ec), n, n+1, ) class ExprFilter(Expr): def __init__(self, expr, filter_expr): super().__init__() self.expr = expr self.filter_expr = filter_expr def eval(self, ec): for value in self.expr.eval(ec): sub_ec = copy.copy(ec) sub_ec.set_toplevel_object(value) filter_result = sub_ec.eval_bool(self.filter_expr) if filter_result: yield value class where: """ Wrap the expression `expr` so that it can be used as a filter in ``[]``. """ def __init__(self, expr): self.expr = expr class _BoolOpMixin: def eval(self, ec): if self.eval_leaf(ec): yield True class CmpOp(_BoolOpMixin, Expr): def __init__(self, operand1, operand2, operator): super().__init__() self.operand1 = operand1 self.operand2 = operand2 self.operator = operator def eval_leaf(self, ec): vs1 = self.operand1.eval_leaf(ec) vs2 = self.operand2.eval_leaf(ec) for v1 in vs1: for v2 in vs2: if self.operator(v1, v2): return True return False class NotOp(_BoolOpMixin, Expr): def __init__(self, operand): super().__init__() self.operand = operand def eval_leaf(self, ec): return not ec.eval_bool(self.operand) def not_(expr): """ Return the boolean-not of the value of `expr`. A expression value is true if it contains at least one element and false otherwise. .. seealso:: :meth:`EvaluationContext.eval_bool` which is used behind the scenes to calculate the boolean value of `expr`. :class:`NotOp` which actually implements the operator. """ return NotOp(as_expr(expr)) class Constant(Expr): def __init__(self, value): super().__init__() self.value = value def eval(self, ec): return [self.value] # Here be dragons: if you use metaclass=abc.ABCMeta with this class, very # interesting things will blow up class PreExpr(_SoftExprMixin): @abc.abstractmethod def xq_instantiate(self, expr=None): pass class Class(PreExpr): def xq_instantiate(self, expr=None): if expr is None: return ContextInstance(self) return GetInstances(expr, self) class BoundDescriptor(_ExprMixin, PreExpr): def __init__(self, class_, descriptor, expr_class, expr_kwargs={}, **kwargs): super().__init__(**kwargs) self.xq_xso_class = class_ self.xq_descriptor = descriptor self.xq_expr_class = expr_class self.xq_expr_kwargs = expr_kwargs def xq_instantiate(self, expr=None): return self.xq_expr_class( self.xq_xso_class.xq_instantiate(expr), self.xq_descriptor, **self.xq_expr_kwargs ) def __getattr__(self, name): try: return super().__getattr__(name) except AttributeError: if not name.startswith("xq_"): return getattr(self.xq_descriptor, name) raise def as_expr(thing, lhs=None): if isinstance(thing, Expr): if hasattr(thing, "expr"): thing.expr = as_expr(thing.expr, lhs=lhs) return thing if isinstance(thing, PreExpr): return thing.xq_instantiate(lhs) return Constant(thing)