Source code for aiosasl.statemachine

########################################################################
# File name: statemachine.py
# This file is part of: aiosasl
#
# LICENSE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program.  If not, see
# <http://www.gnu.org/licenses/>.
#
########################################################################
import abc
import typing

from . import common


[docs]class SASLInterface(metaclass=abc.ABCMeta): """ This class serves as an abstract base class for interfaces for use with :class:`SASLStateMachine`. Specific protocols using SASL (such as XMPP, IMAP or SMTP) can subclass this interface to implement SASL on top of the existing protocol. The interface class does not need to implement any state checking. State checking is done by the :class:`SASLStateMachine`. The following interface must be implemented by subclasses. The return values of the methods below are tuples of the following form: * ``(SASLState.SUCCESS, payload)`` -- After successful authentication, success is returned. Depending on the mechanism, a payload (as :class:`bytes` object) may be attached to the result, otherwise, ``payload`` is :data:`None`. * ``(SASLState.CHALLENGE, payload)`` -- A challenge was sent by the server in reply to the previous command. * ``(SASLState.FAILURE, None)`` -- This is only ever returned by :meth:`abort`. All other methods **must** raise errors as :class:`SASLFailure`. .. versionchanged:: 0.4 The first element of the returned tuples are now elements of :class:`SASLState`. For compatibility with previous versions of ``aiosasl`` the first elements of the string may be one of the strings ``"success"``, ``"failure"`` or "``challenge``". For more information see :meth:`SASLState.from_reply`. .. automethod:: initiate .. automethod:: respond .. automethod:: abort """
[docs] @abc.abstractmethod async def initiate( self, mechanism: str, payload: typing.Optional[bytes] = None, ) -> common.NextStateTuple: """ Send a SASL initiation request for the given `mechanism`. Depending on the `mechanism`, an initial `payload` *may* be given. The `payload` is then a :class:`bytes` object which needs to be passed as initial payload during the initiation request. Wait for a reply by the peer and return the reply as a next-state tuple in the format documented at :class:`SASLInterface`. """
[docs] @abc.abstractmethod async def respond( self, payload: bytes, ) -> common.NextStateTuple: """ Send a response to a challenge. The `payload` is a :class:`bytes` object which is to be sent as response. Wait for a reply by the peer and return the reply as a next-state tuple in the format documented at :class:`SASLInterface`. """
[docs] @abc.abstractmethod async def abort(self) -> None: """ Abort the authentication. The result is either the failure tuple (``(SASLState.FAILURE, None)``) or a :class:`SASLFailure` exception if the response from the peer did not indicate abortion (e.g. another error was returned by the peer or the peer indicated success). """
[docs]class SASLStateMachine: """ A state machine to reduce code duplication during SASL handshake. The state methods change the state and return the next client state of the SASL handshake, optionally with server-supplied payload. Note that, with the notable exception of :meth:`abort`, ``failure`` states are never returned but thrown as :class:`SASLFailure` instead. The initial state is never returned. """ def __init__(self, interface: "SASLInterface"): super().__init__() self.interface = interface self._state = common.SASLState.INITIAL async def initiate( self, mechanism: str, payload: typing.Optional[bytes] = None, ) -> common.NextStateTuple: """ Initiate the SASL handshake and advertise the use of the given `mechanism`. If `payload` is not :data:`None`, it will be base64 encoded and sent as initial client response along with the ``<auth />`` element. Return the next state of the state machine as tuple (see :class:`SASLStateMachine` for details). """ if self._state != common.SASLState.INITIAL: raise RuntimeError("initiate has already been called") try: next_state, payload = await self.interface.initiate( mechanism, payload=payload) except common.SASLFailure: self._state = common.SASLState.FAILURE raise next_state = common.SASLState.from_reply(next_state) self._state = next_state return next_state, payload async def response( self, payload: bytes, ) -> common.NextStateTuple: """ Send a response to the previously received challenge, with the given `payload`. The payload is encoded using base64 and transmitted to the server. Return the next state of the state machine as tuple (see :class:`SASLStateMachine` for details). """ if self._state == common.SASLState.SUCCESS_SIMULATE_CHALLENGE: if payload != b"": # XXX: either our mechanism is buggy or the server # sent SASLState.SUCCESS before all challenge-response # messages defined by the mechanism were sent self._state = common.SASLState.FAILURE raise common.SASLFailure( None, "protocol violation: mechanism did not" " respond with an empty response to a" " challenge with final data – this suggests" " a protocol-violating early success from the server." ) self._state = common.SASLState.SUCCESS return common.SASLState.SUCCESS, None if self._state != common.SASLState.CHALLENGE: raise RuntimeError( "no challenge has been made or negotiation failed") try: next_state, response_payload = await self.interface.respond( payload, ) except common.SASLFailure: self._state = common.SASLState.FAILURE raise next_state = common.SASLState.from_reply(next_state) # unfold the (SASLState.SUCCESS, payload) to a sequence of # (SASLState.CHALLENGE, payload), (SASLState.SUCCESS, None) for the # SASLMethod to allow uniform treatment of both cases if (next_state == common.SASLState.SUCCESS and response_payload is not None): self._state = common.SASLState.SUCCESS_SIMULATE_CHALLENGE return common.SASLState.CHALLENGE, response_payload self._state = next_state return next_state, response_payload async def abort(self) -> None: """ Abort an initiated SASL authentication process. The expected result state is ``failure``. """ if self._state == common.SASLState.INITIAL: raise RuntimeError("SASL authentication hasn't started yet") if self._state == common.SASLState.SUCCESS_SIMULATE_CHALLENGE: raise RuntimeError("SASL message exchange already over") try: return await self.interface.abort() finally: self._state = common.SASLState.FAILURE
[docs]class SASLMechanism(metaclass=abc.ABCMeta): """ Implementation of a SASL mechanism. Two methods must be implemented by subclasses: .. automethod:: any_supported .. automethod:: authenticate .. note:: Administrative note Patches for new SASL mechanisms are welcome! """
[docs] @abc.abstractclassmethod def any_supported( cls, mechanisms: typing.Iterable[str], ) -> typing.Any: """ Determine whether this class can perform any SASL mechanism in the set of strings ``mechanisms``. If the class cannot perform any of the SASL mechanisms in ``mechanisms``, it must return :data:`None`. Otherwise, it must return a non-:data:`None` value. Applications must not assign any meaning to any value (except that :data:`None` is a sure indicator that the class cannot perform any of the listed mechanisms) and must not alter any value returned by this function. Note that even :data:`False` indicates success! The return value must be passed as second argument to :meth:`authenticate`. :meth:`authenticate` must not be called with a :data:`None` value. """
[docs] async def authenticate( self, sm: SASLStateMachine, token: typing.Any, ) -> None: """ Execute the mechanism identified by `token` (the non-:data:`None` value which has been returned by :meth:`any_supported` before) using the given :class:`SASLStateMachine` `sm`. If authentication fails, an appropriate exception is raised (:class:`AuthenticationFailure`). If the authentication fails for a reason unrelated to credentials, :class:`SASLFailure` is raised. """