Source code for aiosasl.common
########################################################################
# File name: common.py
# This file is part of: aiosasl
#
# LICENSE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
#
########################################################################
import enum
import typing
[docs]class SASLError(Exception):
"""
Base class for a SASL related error. `opaque_error` may be anything but
:data:`None` which helps your application re-identify the error at the
outer layers. `kind` is a string which helps identifying the class of the
error; this is set implicitly by the constructors of :class:`SASLFailure`
and :class:`AuthenticationFailure`, which you are encouraged to use.
`text` may be a human-readable string describing the error condition in
more detail.
`opaque_error` is set to :data:`None` by :class:`SASLMechanism`
implementations to indicate errors which originate from the local mechanism
implementation.
.. attribute:: opaque_error
The value passed to the respective constructor argument.
.. attribute:: text
The value passed to the respective constructor argument.
"""
def __init__(
self,
opaque_error: typing.Any,
kind: str,
text: typing.Optional[str] = None):
msg = "{}: {}".format(opaque_error, kind)
if text:
msg += ": {}".format(text)
super().__init__(msg)
self.opaque_error = opaque_error
self.text = text
[docs]class AuthenticationFailure(SASLError):
"""
A SASL error which indicates that the provided credentials are
invalid. This may be raised by :class:`SASLInterface` methods.
"""
def __init__(
self,
opaque_error: typing.Any,
text: typing.Optional[str] = None):
super().__init__(opaque_error, "authentication failed", text=text)
[docs]class SASLFailure(SASLError):
"""
A SASL protocol failure which is unrelated to the credentials passed. This
may be raised by :class:`SASLInterface` methods.
"""
def __init__(
self,
opaque_error: typing.Any,
text: typing.Optional[str] = None):
super().__init__(opaque_error, "SASL failure", text=text)
def promote_to_authentication_failure(self) -> AuthenticationFailure:
return AuthenticationFailure(
self.opaque_error,
self.text)
[docs]class SASLState(enum.Enum):
"""
The states of the SASL state machine.
.. attribute:: CHALLENGE
the server sent a SASL challenge
.. attribute:: SUCCESS
the authentication was successful
.. attribute:: FAILURE
the authentication failed
Internal states used by the state machine:
.. attribute:: INITIAL
the state of the state machine before the
authentication is started
.. attribute:: SUCCESS_SIMULATE_CHALLENGE
used to unwrap success replies that carry final data
These internal states *must not* be returned by the
:class:`SASLInterface` methods as first component of the result
tuple.
The following method is used to process replies returned
by the :class:`SASLInterface` methods:
.. method:: from_reply
"""
INITIAL = "initial"
CHALLENGE = "challenge"
SUCCESS = "success"
FAILURE = "failure"
SUCCESS_SIMULATE_CHALLENGE = "success-simulate-challenge"
[docs] @classmethod
def from_reply(cls, state: "SASLState") -> "SASLState":
"""
Comptaibility layer for old :class:`SASLInterface`
implementations.
Accepts the follwing set of :class:`SASLState` or strings and
maps the strings to :class:`SASLState` elements as follows:
``"challenge"``
:member:`SASLState.CHALLENGE`
``"failue"``
:member:`SASLState.FAILURE`
``"success"``
:member:`SASLState.SUCCESS`
"""
if state in (SASLState.FAILURE, SASLState.SUCCESS,
SASLState.CHALLENGE):
return state
if state in ("failure", "success", "challenge"):
return SASLState(state)
else:
raise RuntimeError("invalid SASL state", state)
NextStateTuple = typing.Tuple[SASLState, typing.Optional[bytes]]
CredentialProvider = typing.Callable[
[], typing.Coroutine[typing.Any, typing.Any, typing.Tuple[str, str]]
]