########################################################################
# File name: service.py
# This file is part of: aioxmpp
#
# LICENSE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
#
########################################################################
import asyncio
# import base64
import collections
import logging
import random
# from datetime import timedelta
import aioxmpp.disco
import aioxmpp.errors
import aioxmpp.disco.xso as disco_xso
import aioxmpp.service
import aioxmpp.structs
from aioxmpp.utils import namespaces
from . import xso as adhoc_xso
_logger = logging.getLogger(__name__)
_rng = random.SystemRandom()
class SessionError(RuntimeError):
pass
class ClientCancelledError(SessionError):
pass
[docs]class AdHocClient(aioxmpp.service.Service):
"""
Access other entities :xep:`50` Ad-Hoc commands.
This service provides helpers to conveniently access and execute :xep:`50`
Ad-Hoc commands.
.. automethod:: supports_commands
.. automethod:: get_commands
.. automethod:: get_command_info
.. automethod:: execute
"""
ORDER_AFTER = [aioxmpp.disco.DiscoClient]
[docs] async def get_commands(self, peer_jid):
"""
Return the list of commands offered by the peer.
:param peer_jid: JID of the peer to query
:type peer_jid: :class:`~aioxmpp.JID`
:rtype: :class:`list` of :class:`~.disco.xso.Item`
:return: List of command items
In the returned list, each :class:`~.disco.xso.Item` represents one
command supported by the peer. The :attr:`~.disco.xso.Item.node`
attribute is the identifier of the command which can be used with
:meth:`get_command_info` and :meth:`execute`.
"""
disco = self.dependencies[aioxmpp.disco.DiscoClient]
response = await disco.query_items(
peer_jid,
node=namespaces.xep0050_commands,
)
return response.items
[docs] async def get_command_info(self, peer_jid, command_name):
"""
Obtain information about a command.
:param peer_jid: JID of the peer to query
:type peer_jid: :class:`~aioxmpp.JID`
:param command_name: Node name of the command
:type command_name: :class:`str`
:rtype: :class:`~.disco.xso.InfoQuery`
:return: Service discovery information about the command
Sends a service discovery query to the service discovery node of the
command. The returned object contains information about the command,
such as the namespaces used by its implementation (generally the
:xep:`4` data forms namespace) and possibly localisations of the
commands name.
The `command_name` can be obtained by inspecting the listing from
:meth:`get_commands` or from well-known command names as defined for
example in :xep:`133`.
"""
disco = self.dependencies[aioxmpp.disco.DiscoClient]
response = await disco.query_info(
peer_jid,
node=command_name,
)
return response
[docs] async def supports_commands(self, peer_jid):
"""
Detect whether a peer supports :xep:`50` Ad-Hoc commands.
:param peer_jid: JID of the peer to query
:type peer_jid: :class:`aioxmpp.JID`
:rtype: :class:`bool`
:return: True if the peer supports the Ad-Hoc commands protocol, false
otherwise.
Note that the fact that a peer supports the protocol does not imply
that it offers any commands.
"""
disco = self.dependencies[aioxmpp.disco.DiscoClient]
response = await disco.query_info(
peer_jid,
)
return namespaces.xep0050_commands in response.features
[docs] async def execute(self, peer_jid, command_name):
"""
Start execution of a command with a peer.
:param peer_jid: JID of the peer to start the command at.
:type peer_jid: :class:`~aioxmpp.JID`
:param command_name: Node name of the command to execute.
:type command_name: :class:`str`
:rtype: :class:`~.adhoc.service.ClientSession`
:return: A started command execution session.
Initialises a client session and starts execution of the command. The
session is returned.
This may raise any exception which may be raised by
:meth:`~.adhoc.service.ClientSession.start`.
"""
session = ClientSession(
self.client.stream,
peer_jid,
command_name,
)
await session.start()
return session
CommandEntry = collections.namedtuple(
"CommandEntry",
[
"name",
"is_allowed",
"handler",
"features",
]
)
class CommandEntry(aioxmpp.disco.StaticNode):
def __init__(self, name, handler, features=set(), is_allowed=None):
super().__init__()
if isinstance(name, str):
self.__name = aioxmpp.structs.LanguageMap({None: name})
else:
self.__name = aioxmpp.structs.LanguageMap(name)
self.__handler = handler
features = set(features) | {namespaces.xep0050_commands}
for feature in features:
self.register_feature(feature)
self.__is_allowed = is_allowed
self.register_identity(
"automation",
"command-node",
names=self.__name
)
@property
def name(self):
return self.__name
@property
def handler(self):
return self.__handler
@property
def is_allowed(self):
return self.__is_allowed
def is_allowed_for(self, *args, **kwargs):
if self.__is_allowed is None:
return True
return self.__is_allowed(*args, **kwargs)
def iter_identities(self, stanza):
if not self.is_allowed_for(stanza.from_):
return iter([])
return super().iter_identities(stanza)
[docs]class AdHocServer(aioxmpp.service.Service, aioxmpp.disco.Node):
"""
Support for serving Ad-Hoc commands.
.. .. automethod:: register_stateful_command
.. automethod:: register_stateless_command
.. automethod:: unregister_command
"""
ORDER_AFTER = [aioxmpp.disco.DiscoServer]
disco_node = aioxmpp.disco.mount_as_node(
"http://jabber.org/protocol/commands"
)
disco_feature = aioxmpp.disco.register_feature(
"http://jabber.org/protocol/commands"
)
def __init__(self, client, **kwargs):
super().__init__(client, **kwargs)
self.register_identity(
"automation", "command-list",
)
self._commands = {}
self._disco = self.dependencies[aioxmpp.disco.DiscoServer]
@aioxmpp.service.iq_handler(aioxmpp.IQType.SET,
adhoc_xso.Command)
async def _handle_command(self, stanza):
try:
info = self._commands[stanza.payload.node]
except KeyError:
raise aioxmpp.errors.XMPPCancelError(
aioxmpp.errors.ErrorCondition.ITEM_NOT_FOUND,
text="no such command: {!r}".format(
stanza.payload.node
)
)
if not info.is_allowed_for(stanza.from_):
raise aioxmpp.errors.XMPPCancelError(
aioxmpp.errors.ErrorCondition.FORBIDDEN,
)
return await info.handler(stanza)
def iter_items(self, stanza):
local_jid = self.client.local_jid
languages = [
aioxmpp.structs.LanguageRange.fromstr("en"),
]
if stanza.lang is not None:
languages.insert(0, aioxmpp.structs.LanguageRange.fromstr(
str(stanza.lang)
))
for node, info in self._commands.items():
if not info.is_allowed_for(stanza.from_):
continue
yield disco_xso.Item(
local_jid,
name=info.name.lookup(languages),
node=node,
)
[docs] def register_stateless_command(self, node, name, handler, *,
is_allowed=None,
features={namespaces.xep0004_data}):
"""
Register a handler for a stateless command.
:param node: Name of the command (``node`` in the service discovery
list).
:type node: :class:`str`
:param name: Human-readable name of the command
:type name: :class:`str` or :class:`~.LanguageMap`
:param handler: Coroutine function to run to get the response for a
request.
:param is_allowed: A predicate which determines whether the command is
shown and allowed for a given peer.
:type is_allowed: function or :data:`None`
:param features: Set of features to announce for the command
:type features: :class:`set` of :class:`str`
When a request for the command is received, `handler` is invoked. The
semantics of `handler` are the same as for
:meth:`~.StanzaStream.register_iq_request_handler`. It must produce a
valid :class:`~.adhoc.xso.Command` response payload.
If `is_allowed` is not :data:`None`, it is invoked whenever a command
listing is generated and whenever a command request is received. The
:class:`aioxmpp.JID` of the requester is passed as positional argument
to `is_allowed`. If `is_allowed` returns false, the command is not
included in the list and attempts to execute it are rejected with
``<forbidden/>`` without calling `handler`.
If `is_allowed` is :data:`None`, the command is always visible and
allowed.
The `features` are returned on a service discovery info request for the
command node. By default, the :xep:`4` (Data Forms) namespace is
included, but this can be overridden by passing a different set without
that feature to `features`.
"""
info = CommandEntry(
name,
handler,
is_allowed=is_allowed,
features=features,
)
self._commands[node] = info
self._disco.mount_node(
node,
info,
)
[docs] def unregister_command(self, node):
"""
Unregister a command previously registered.
:param node: Name of the command (``node`` in the service discovery
list).
:type node: :class:`str`
"""
[docs]class ClientSession:
"""
Represent an Ad-Hoc command session on the client side.
:param stream: The stanza stream over which the session is established.
:type stream: :class:`~.StanzaStream`
:param peer_jid: The full JID of the peer to communicate with
:type peer_jid: :class:`~aioxmpp.JID`
:param command_name: The command to run
:type command_name: :class:`str`
The constructor does not send any stanza, it merely prepares the internal
state. To start the command itself, use the :class:`ClientSession` object
as context manager or call :meth:`start`.
.. note::
The client session returned by :meth:`.AdHocClient.execute` is already
started.
The `command_name` must be one of the :attr:`~.disco.xso.Item.node` values
as returned by :meth:`.AdHocClient.get_commands`.
.. automethod:: start
.. automethod:: proceed
.. automethod:: close
The following attributes change depending on the stage of execution of the
command:
.. autoattribute:: allowed_actions
.. autoattribute:: first_payload
.. autoattribute:: response
.. autoattribute:: status
"""
def __init__(self, stream, peer_jid, command_name, *, logger=None):
super().__init__()
self._stream = stream
self._peer_jid = peer_jid
self._command_name = command_name
self._logger = logger or _logger
self._status = None
self._response = None
@property
def status(self):
"""
The current status of command execution. This is either :data:`None` or
one of the :class:`~.adhoc.CommandStatus` enumeration values.
Initially, this attribute is :data:`None`. After calls to
:meth:`start`, :meth:`proceed` or :meth:`close`, it takes the value of
the :attr:`~.xso.Command.status` attribute of the response.
"""
if self._response is not None:
return self._response.status
return None
@property
def response(self):
"""
The last :class:`~.xso.Command` received from the peer.
This is initially (and after :meth:`close`) :data:`None`.
"""
return self._response
@property
def first_payload(self):
"""
Shorthand to access :attr:`~.xso.Command.first_payload` of the
:attr:`response`.
This is initially (and after :meth:`close`) :data:`None`.
"""
if self._response is not None:
return self._response.first_payload
return None
@property
def sessionid(self):
"""
Shorthand to access :attr:`~.xso.Command.sessionid` of the
:attr:`response`.
This is initially (and after :meth:`close`) :data:`None`.
"""
if self._response is not None:
return self._response.sessionid
return None
@property
def allowed_actions(self):
"""
Shorthand to access :attr:`~.xso.Actions.allowed_actions` of the
:attr:`response`.
If no response has been received yet or if the response specifies no
set of valid actions, this is the minimal set of allowed actions (
:attr:`~.ActionType.EXECUTE` and :attr:`~.ActionType.CANCEL`).
"""
if self._response is not None and self._response.actions is not None:
return self._response.actions.allowed_actions
return {adhoc_xso.ActionType.EXECUTE,
adhoc_xso.ActionType.CANCEL}
[docs] async def start(self):
"""
Initiate the session by starting to execute the command with the peer.
:return: The :attr:`~.xso.Command.first_payload` of the response
This sends an empty command IQ request with the
:attr:`~.ActionType.EXECUTE` action.
The :attr:`status`, :attr:`response` and related attributes get updated
with the newly received values.
"""
if self._response is not None:
raise RuntimeError("command execution already started")
request = aioxmpp.IQ(
type_=aioxmpp.IQType.SET,
to=self._peer_jid,
payload=adhoc_xso.Command(self._command_name),
)
self._response = await self._stream.send_iq_and_wait_for_reply(
request,
)
return self._response.first_payload
[docs] async def proceed(self, *,
action=adhoc_xso.ActionType.EXECUTE,
payload=None):
"""
Proceed command execution to the next stage.
:param action: Action type for proceeding
:type action: :class:`~.ActionTyp`
:param payload: Payload for the request, or :data:`None`
:return: The :attr:`~.xso.Command.first_payload` of the response
`action` must be one of the actions returned by
:attr:`allowed_actions`. It defaults to :attr:`~.ActionType.EXECUTE`,
which is (alongside with :attr:`~.ActionType.CANCEL`) always allowed.
`payload` may be a sequence of XSOs, a single XSO or :data:`None`. If
it is :data:`None`, the XSOs from the request are re-used. This is
useful if you modify the payload in-place (e.g. via
:attr:`first_payload`). Otherwise, the payload on the request is set to
the `payload` argument; if it is a single XSO, it is wrapped in a
sequence.
The :attr:`status`, :attr:`response` and related attributes get updated
with the newly received values.
"""
if self._response is None:
raise RuntimeError("command execution not started yet")
if action not in self.allowed_actions:
raise ValueError("action {} not allowed in this stage".format(
action
))
cmd = adhoc_xso.Command(
self._command_name,
action=action,
payload=self._response.payload if payload is None else payload,
sessionid=self.sessionid,
)
request = aioxmpp.IQ(
type_=aioxmpp.IQType.SET,
to=self._peer_jid,
payload=cmd,
)
try:
self._response = await self._stream.send_iq_and_wait_for_reply(
request,
)
except (aioxmpp.errors.XMPPModifyError,
aioxmpp.errors.XMPPCancelError) as exc:
if isinstance(exc.application_defined_condition,
(adhoc_xso.BadSessionID,
adhoc_xso.SessionExpired)):
await self.close()
raise SessionError(exc.text)
if isinstance(exc, aioxmpp.errors.XMPPCancelError):
await self.close()
raise
return self._response.first_payload
[docs] async def close(self):
if self._response is None:
return
if self.status != adhoc_xso.CommandStatus.COMPLETED:
request = aioxmpp.IQ(
type_=aioxmpp.IQType.SET,
to=self._peer_jid,
payload=adhoc_xso.Command(
self._command_name,
sessionid=self.sessionid,
action=adhoc_xso.ActionType.CANCEL,
)
)
try:
await self._stream.send_iq_and_wait_for_reply(
request,
)
except aioxmpp.errors.StanzaError as exc:
# we are cancelling only out of courtesy.
# if something goes wrong here, it’s barely worth logging
self._logger.debug(
"ignored stanza error during close(): %r",
exc,
)
self._response = None
async def __aenter__(self):
if self._response is None:
await self.start()
return self
async def __aexit__(self, exc_type, exc_value, exc_traceback):
await self.close()