Source code for aioxmpp.adhoc.service

########################################################################
# File name: service.py
# This file is part of: aioxmpp
#
# LICENSE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program.  If not, see
# <http://www.gnu.org/licenses/>.
#
########################################################################
import asyncio
# import base64
import collections
import logging
import random

# from datetime import timedelta

import aioxmpp.disco
import aioxmpp.disco.xso as disco_xso
import aioxmpp.service
import aioxmpp.structs

from aioxmpp.utils import namespaces

from . import xso as adhoc_xso


_logger = logging.getLogger(__name__)
_rng = random.SystemRandom()


class SessionError(RuntimeError):
    pass


class ClientCancelledError(SessionError):
    pass


class AdHocClient(aioxmpp.service.Service):
    """
    Access other entities :xep:`50` Ad-Hoc commands.

    This service provides helpers to conveniently access and execute :xep:`50`
    Ad-Hoc commands.

    .. automethod:: supports_commands

    .. automethod:: get_commands

    .. automethod:: get_command_info

    .. automethod:: execute
    """

    ORDER_AFTER = [aioxmpp.disco.DiscoClient]

    @asyncio.coroutine
    def get_commands(self, peer_jid):
        """
        Return the list of commands offered by the peer.

        :param peer_jid: JID of the peer to query
        :type peer_jid: :class:`~aioxmpp.JID`
        :rtype: :class:`list` of :class:`~.disco.xso.Item`
        :return: List of command items

        In the returned list, each :class:`~.disco.xso.Item` represents one
        command supported by the peer. The :attr:`~.disco.xso.Item.node`
        attribute is the identifier of the command which can be used with
        :meth:`get_command_info` and :meth:`execute`.
        """

        disco = self.dependencies[aioxmpp.disco.DiscoClient]
        response = yield from disco.query_items(
            peer_jid,
            node=namespaces.xep0050_commands,
        )
        return response.items

    @asyncio.coroutine
    def get_command_info(self, peer_jid, command_name):
        """
        Obtain information about a command.

        :param peer_jid: JID of the peer to query
        :type peer_jid: :class:`~aioxmpp.JID`
        :param command_name: Node name of the command
        :type command_name: :class:`str`
        :rtype: :class:`~.disco.xso.InfoQuery`
        :return: Service discovery information about the command

        Sends a service discovery query to the service discovery node of the
        command. The returned object contains information about the command,
        such as the namespaces used by its implementation (generally the
        :xep:`4` data forms namespace) and possibly localisations of the
        commands name.

        The `command_name` can be obtained by inspecting the listing from
        :meth:`get_commands` or from well-known command names as defined for
        example in :xep:`133`.
        """

        disco = self.dependencies[aioxmpp.disco.DiscoClient]
        response = yield from disco.query_info(
            peer_jid,
            node=command_name,
        )
        return response

    @asyncio.coroutine
    def supports_commands(self, peer_jid):
        """
        Detect whether a peer supports :xep:`50` Ad-Hoc commands.

        :param peer_jid: JID of the peer to query
        :type peer_jid: :class:`aioxmpp.JID`
        :rtype: :class:`bool`
        :return: True if the peer supports the Ad-Hoc commands protocol, false
                 otherwise.

        Note that the fact that a peer supports the protocol does not imply
        that it offers any commands.
        """

        disco = self.dependencies[aioxmpp.disco.DiscoClient]
        response = yield from disco.query_info(
            peer_jid,
        )

        return namespaces.xep0050_commands in response.features

    @asyncio.coroutine
    def execute(self, peer_jid, command_name):
        """
        Start execution of a command with a peer.

        :param peer_jid: JID of the peer to start the command at.
        :type peer_jid: :class:`~aioxmpp.JID`
        :param command_name: Node name of the command to execute.
        :type command_name: :class:`str`
        :rtype: :class:`~.adhoc.service.ClientSession`
        :return: A started command execution session.

        Initialises a client session and starts execution of the command. The
        session is returned.

        This may raise any exception which may be raised by
        :meth:`~.adhoc.service.ClientSession.start`.
        """

        session = ClientSession(
            self.client.stream,
            peer_jid,
            command_name,
        )
        yield from session.start()
        return session


CommandEntry = collections.namedtuple(
    "CommandEntry",
    [
        "name",
        "is_allowed",
        "handler",
        "features",
    ]
)


class CommandEntry(aioxmpp.disco.StaticNode):
    def __init__(self, name, handler, features=set(), is_allowed=None):
        super().__init__()
        if isinstance(name, str):
            self.__name = aioxmpp.structs.LanguageMap({None: name})
        else:
            self.__name = aioxmpp.structs.LanguageMap(name)
        self.__handler = handler

        features = set(features) | {namespaces.xep0050_commands}
        for feature in features:
            self.register_feature(feature)

        self.__is_allowed = is_allowed

        self.register_identity(
            "automation",
            "command-node",
            names=self.__name
        )

    @property
    def name(self):
        return self.__name

    @property
    def handler(self):
        return self.__handler

    @property
    def is_allowed(self):
        return self.__is_allowed

    def is_allowed_for(self, *args, **kwargs):
        if self.__is_allowed is None:
            return True
        return self.__is_allowed(*args, **kwargs)

    def iter_identities(self, stanza):
        if not self.is_allowed_for(stanza.from_):
            return iter([])
        return super().iter_identities(stanza)


class AdHocServer(aioxmpp.service.Service, aioxmpp.disco.Node):
    """
    Support for serving Ad-Hoc commands.

    .. .. automethod:: register_stateful_command

    .. automethod:: register_stateless_command

    .. automethod:: unregister_command
    """

    ORDER_AFTER = [aioxmpp.disco.DiscoServer]

    disco_node = aioxmpp.disco.mount_as_node(
        "http://jabber.org/protocol/commands"
    )
    disco_feature = aioxmpp.disco.register_feature(
        "http://jabber.org/protocol/commands"
    )

    def __init__(self, client, **kwargs):
        super().__init__(client, **kwargs)
        self.register_identity(
            "automation", "command-list",
        )

        self._commands = {}
        self._disco = self.dependencies[aioxmpp.disco.DiscoServer]

    @aioxmpp.service.iq_handler(aioxmpp.IQType.SET,
                                adhoc_xso.Command)
    @asyncio.coroutine
    def _handle_command(self, stanza):
        try:
            info = self._commands[stanza.payload.node]
        except KeyError:
            raise aioxmpp.errors.XMPPCancelError(
                (namespaces.stanzas, "item-not-found"),
                text="no such command: {!r}".format(
                    stanza.payload.node
                )
            )

        if not info.is_allowed_for(stanza.from_):
            raise aioxmpp.errors.XMPPCancelError(
                (namespaces.stanzas, "forbidden"),
            )

        return (yield from info.handler(stanza))

    def iter_items(self, stanza):
        local_jid = self.client.local_jid
        languages = [
            aioxmpp.structs.LanguageRange.fromstr("en"),
        ]

        if stanza.lang is not None:
            languages.insert(0, aioxmpp.structs.LanguageRange.fromstr(
                str(stanza.lang)
            ))

        for node, info in self._commands.items():
            if not info.is_allowed_for(stanza.from_):
                continue
            yield disco_xso.Item(
                local_jid,
                name=info.name.lookup(languages),
                node=node,
            )

    # def register_stateful_command(self, node, name, handler, *,
    #                               is_allowed=None,
    #                               features={namespaces.xep0004_data}):
    #     """
    #     Register a handler for a stateful command.

    #     :param node: Name of the command (``node`` in the service discovery
    #                  list).
    #     :type node: :class:`str`
    #     :param name: Human-readable name of the command
    #     :type name: :class:`str` or :class:`~.LanguageMap`
    #     :param handler: Coroutine function to spawn when a new session is
    #                     started.
    #     :param is_allowed: A predicate which determines whether the command is
    #                        shown and allowed for a given peer.
    #     :type is_allowed: function or :data:`None`
    #     :param features: Set of features to announce for the command
    #     :type features: :class:`set` of :class:`str`

    #     Whenever a new session is started, `handler` is invoked with a session
    #     object which allows the handler to communicate with the client. The
    #     details of the session are described at :class:`ServerSession`.

    #     If `is_allowed` is not :data:`None`, it is invoked whenever a command
    #     listing is generated and whenever a command session is about to start.
    #     The :class:`~aioxmpp.JID` of the requester is passed as positional
    #     argument to `is_allowed`. If `is_allowed` returns false, the command
    #     is not included in the list and attempts to execute it are rejected
    #     with ``<forbidden/>`` without calling `handler`.

    #     If `is_allowed` is :data:`None`, the command is always visible and
    #     allowed.

    #     The `features` are returned on a service discovery info request for the
    #     command node. By default, the :xep:`4` (Data Forms) namespace is
    #     included, but this can be overridden by passing a different set without
    #     that feature to `features`.

    #     .. warning::

    #        There is currently no rate-limiting mechanism implemented. It is
    #        trivial for an attacker to exhaust memory by starting a huge amount
    #        of sessions.

    #     """

    def register_stateless_command(self, node, name, handler, *,
                                   is_allowed=None,
                                   features={namespaces.xep0004_data}):
        """
        Register a handler for a stateless command.

        :param node: Name of the command (``node`` in the service discovery
                     list).
        :type node: :class:`str`
        :param name: Human-readable name of the command
        :type name: :class:`str` or :class:`~.LanguageMap`
        :param handler: Coroutine function to run to get the response for a
                        request.
        :param is_allowed: A predicate which determines whether the command is
                           shown and allowed for a given peer.
        :type is_allowed: function or :data:`None`
        :param features: Set of features to announce for the command
        :type features: :class:`set` of :class:`str`

        When a request for the command is received, `handler` is invoked. The
        semantics of `handler` are the same as for
        :meth:`~.StanzaStream.register_iq_request_coro`. It must produce a
        valid :class:`~.adhoc.xso.Command` response payload.

        If `is_allowed` is not :data:`None`, it is invoked whenever a command
        listing is generated and whenever a command request is received. The
        :class:`aioxmpp.JID` of the requester is passed as positional argument
        to `is_allowed`. If `is_allowed` returns false, the command is not
        included in the list and attempts to execute it are rejected with
        ``<forbidden/>`` without calling `handler`.

        If `is_allowed` is :data:`None`, the command is always visible and
        allowed.

        The `features` are returned on a service discovery info request for the
        command node. By default, the :xep:`4` (Data Forms) namespace is
        included, but this can be overridden by passing a different set without
        that feature to `features`.
        """

        info = CommandEntry(
            name,
            handler,
            is_allowed=is_allowed,
            features=features,
        )
        self._commands[node] = info
        self._disco.mount_node(
            node,
            info,
        )

    def unregister_command(self, node):
        """
        Unregister a command previously registered.

        :param node: Name of the command (``node`` in the service discovery
                     list).
        :type node: :class:`str`
        """


[docs]class ClientSession: """ Represent an Ad-Hoc command session on the client side. :param stream: The stanza stream over which the session is established. :type stream: :class:`~.StanzaStream` :param peer_jid: The full JID of the peer to communicate with :type peer_jid: :class:`~aioxmpp.JID` :param command_name: The command to run :type command_name: :class:`str` The constructor does not send any stanza, it merely prepares the internal state. To start the command itself, use the :class:`ClientSession` object as context manager or call :meth:`start`. .. note:: The client session returned by :meth:`.AdHocClient.execute` is already started. The `command_name` must be one of the :attr:`~.disco.xso.Item.node` values as returned by :meth:`.AdHocClient.get_commands`. .. automethod:: start .. automethod:: proceed .. automethod:: close The following attributes change depending on the stage of execution of the command: .. autoattribute:: allowed_actions .. autoattribute:: first_payload .. autoattribute:: response .. autoattribute:: status """ def __init__(self, stream, peer_jid, command_name, *, logger=None): super().__init__() self._stream = stream self._peer_jid = peer_jid self._command_name = command_name self._logger = logger or _logger self._status = None self._response = None @property
[docs] def status(self): """ The current status of command execution. This is either :data:`None` or one of the :class:`~.adhoc.CommandStatus` enumeration values. Initially, this attribute is :data:`None`. After calls to :meth:`start`, :meth:`proceed` or :meth:`close`, it takes the value of the :attr:`~.xso.Command.status` attribute of the response. """ if self._response is not None: return self._response.status return None
@property
[docs] def response(self): """ The last :class:`~.xso.Command` received from the peer. This is initially (and after :meth:`close`) :data:`None`. """ return self._response
@property
[docs] def first_payload(self): """ Shorthand to access :attr:`~.xso.Command.first_payload` of the :attr:`response`. This is initially (and after :meth:`close`) :data:`None`. """ if self._response is not None: return self._response.first_payload return None
@property def sessionid(self): """ Shorthand to access :attr:`~.xso.Command.sessionid` of the :attr:`response`. This is initially (and after :meth:`close`) :data:`None`. """ if self._response is not None: return self._response.sessionid return None @property
[docs] def allowed_actions(self): """ Shorthand to access :attr:`~.xso.Actions.allowed_actions` of the :attr:`response`. If no response has been received yet or if the response specifies no set of valid actions, this is the minimal set of allowed actions ( :attr:`~.ActionType.EXECUTE` and :attr:`~.ActionType.CANCEL`). """ if self._response is not None and self._response.actions is not None: return self._response.actions.allowed_actions return {adhoc_xso.ActionType.EXECUTE, adhoc_xso.ActionType.CANCEL}
@asyncio.coroutine
[docs] def start(self): """ Initiate the session by starting to execute the command with the peer. :return: The :attr:`~.xso.Command.first_payload` of the response This sends an empty command IQ request with the :attr:`~.ActionType.EXECUTE` action. The :attr:`status`, :attr:`response` and related attributes get updated with the newly received values. """ if self._response is not None: raise RuntimeError("command execution already started") request = aioxmpp.IQ( type_=aioxmpp.IQType.SET, to=self._peer_jid, payload=adhoc_xso.Command(self._command_name), ) self._response = yield from self._stream.send_iq_and_wait_for_reply( request, ) return self._response.first_payload
@asyncio.coroutine
[docs] def proceed(self, *, action=adhoc_xso.ActionType.EXECUTE, payload=None): """ Proceed command execution to the next stage. :param action: Action type for proceeding :type action: :class:`~.ActionTyp` :param payload: Payload for the request, or :data:`None` :return: The :attr:`~.xso.Command.first_payload` of the response `action` must be one of the actions returned by :attr:`allowed_actions`. It defaults to :attr:`~.ActionType.EXECUTE`, which is (alongside with :attr:`~.ActionType.CANCEL`) always allowed. `payload` may be a sequence of XSOs, a single XSO or :data:`None`. If it is :data:`None`, the XSOs from the request are re-used. This is useful if you modify the payload in-place (e.g. via :attr:`first_payload`). Otherwise, the payload on the request is set to the `payload` argument; if it is a single XSO, it is wrapped in a sequence. The :attr:`status`, :attr:`response` and related attributes get updated with the newly received values. """ if self._response is None: raise RuntimeError("command execution not started yet") if action not in self.allowed_actions: raise ValueError("action {} not allowed in this stage".format( action )) cmd = adhoc_xso.Command( self._command_name, action=action, payload=self._response.payload if payload is None else payload, sessionid=self.sessionid, ) request = aioxmpp.IQ( type_=aioxmpp.IQType.SET, to=self._peer_jid, payload=cmd, ) try: self._response = \ yield from self._stream.send_iq_and_wait_for_reply( request, ) except (aioxmpp.errors.XMPPModifyError, aioxmpp.errors.XMPPCancelError) as exc: if isinstance(exc.application_defined_condition, (adhoc_xso.BadSessionID, adhoc_xso.SessionExpired)): yield from self.close() raise SessionError(exc.text) if isinstance(exc, aioxmpp.errors.XMPPCancelError): yield from self.close() raise return self._response.first_payload
@asyncio.coroutine
[docs] def close(self): if self._response is None: return if self.status != adhoc_xso.CommandStatus.COMPLETED: request = aioxmpp.IQ( type_=aioxmpp.IQType.SET, to=self._peer_jid, payload=adhoc_xso.Command( self._command_name, sessionid=self.sessionid, action=adhoc_xso.ActionType.CANCEL, ) ) try: yield from self._stream.send_iq_and_wait_for_reply( request, ) except aioxmpp.errors.StanzaError as exc: # we are cancelling only out of courtesy. # if something goes wrong here, it’s barely worth logging self._logger.debug( "ignored stanza error during close(): %r", exc, ) self._response = None
@asyncio.coroutine def __aenter__(self): if self._response is None: yield from self.start() return self @asyncio.coroutine def __aexit__(self, exc_type, exc_value, exc_traceback): yield from self.close() # class ServerSession: # """ # Represent an Ad-Hoc Commands session on the server side. # :param stream: The stanza stream to communicate over # :type stream: :class:`~.stream.StanzaStream` # :param sessionid: Session ID to use for communication # :type sessionid: :class:`str` or :data:`None` # :param timeout: Maximum time to wait for a follow-up from the peer # :type timeout: :class:`datetime.timedelta` # The session knows its session ID and the peer JID and keeps track of client # timeouts as well as the most recent reply. # If `sessionid` is :data:`None`, a random session ID with at least 64 bits # of entropy is generated. # """ # def __init__(self, stream, # sessionid=None, # timeout=timedelta(seconds=60)): # super().__init__() # self.stream = stream # self.peer_jid = peer_jid # if sessionid is None: # sessionid = base64.urlsafe_b64encode( # _rng.getrandbits(64).to_bytes( # 64//8, # "little" # ) # ).rstrip(b"=").decode("ascii") # self.sessionid = sessionid # self.timeout = timeout # self._future = None # @asyncio.coroutine # def handle(self, stanza): # pass # @asyncio.coroutine # def reply(self, payload, status, # *, # actions={adhoc_xso.ActionType.NEXT, # adhoc_xso.ActionType.COMPLETE}, # default_action=adhoc_xso.ActionType.NEXT): # """ # Send a reply to the peer. # :param payload: Payload to send in the reply. # :type payload: :class:`~.XSO` or sequence of :class:`~.XSO` # :param status: Status of the command execution. # :type status: :class:`~.adhoc.CommandStatus` # :param actions: Set of actions allowed now. # :type actions: set of :class:`~.adhoc.xso.ActionType` # :param default_action: The action to assume if the client simply # continues with # :attr:`~.adhoc.xso.ActionType.EXECUTE`. # :type default_action: :class:`~.adhoc.xso.ActionType` member which is # not :attr:`~.adhoc.xso.ActionType.EXECUTE` # :raise ClientCancelledError: if the client cancels the execution # :raise RuntimeError: if the client has not yet sent a request # :raise TimeoutError: if the client does not send a follow-up message in # time # :return: The chosen action and the payload given by the peer. # :rtype: Pair of :class:`~.adhoc.xso.ActionType` and sequence of # :class:`~.XSO` objects. # Send a reply to the peer. The `payload` must be a single # :class:`~.XSO` or a sequence of :class:`~.XSO` objects. A single # :class:`~.XSO` is wrapped in a sequence. The sequence is used as # payload for the Ad-Hoc Command response. # `status` informs the client about the current status of execution. For # all but the last reply, this should be # :attr:`~.adhoc.CommandStatus.EXECUTING`, but it should be set to # :attr:`~.adhoc.CommandStatus.COMPLETED` on the last reply. # Unfortunately, we do not have a sensible way to infer this, which is # why there is no default for this argument. # `actions` is the set of actions allowed for the client. The default is # to allow the :attr:`~.adhoc.xso.ActionType.NEXT` and # :attr:`~.adhoc.xso.ActionType.COMPLETE` actions. Depending on your # application, you may need a different set of actions. You do not need # to specify the :attr:`~.adhoc.xso.ActionType.EXECUTE` or # :attr:`~.adhoc.xso.ActionType.CANCEL` action types, which are # implicitly allowed. # `default_action` is the action which is assumed when the client simply # specifies the :attr:`~.adhoc.xso.ActionType.EXECUTE` action. It # defaults to :attr:`~.adhoc.xso.ActionType.NEXT` and *must* be included # in the `actions` set. # The response from the client is generally returned a as a tuple # consisting of the action chosen by the client and the payload sent by # the client. # If the client chooses the :attr:`~.adhoc.xso.ActionType.CANCEL` action, # a :class:`RuntimeError` exception is raised and a confirmation of # cancellation is sent to the client automatically and the session is # closed. # If the client does not answer within the timeout, :class:`TimeoutError` # is raised and the session is closed. # """