Source code for aiosasl.channel_binding

########################################################################
# File name: channel_binding.py
# This file is part of: aiosasl
#
# LICENSE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program.  If not, see
# <http://www.gnu.org/licenses/>.
#
########################################################################
"""
Channel binding methods
=======================

The module :mod:`aiosasl.channel_binding` provides
implementations of the :class:`~.ChannelBindingProvider`
interface for use with :mod:`ssl` respective :mod:`OpenSSL`.

.. autoclass:: ChannelBindingProvider

.. autoclass:: StdlibTLS

.. autoclass:: TLSUnique

.. autoclass:: TLSServerEndPoint
"""
import abc
import functools
import ssl

try:
    import OpenSSL  # for mypy
except ImportError:
    pass


[docs]class ChannelBindingProvider(metaclass=abc.ABCMeta): """ Interface for a channel binding method. The needed external information is supplied to the constructors of the specific instances. """ @abc.abstractproperty def cb_name(self) -> bytes: """ Return the name of the channel-binding mechanism. :rtype: :class:`bytes` """ raise NotImplementedError @abc.abstractmethod def extract_cb_data(self) -> bytes: """ Return the channel binding data. :returns: the channel binding data :rtype: :class:`bytes` """ raise NotImplementedError
[docs]class StdlibTLS(ChannelBindingProvider): """ Provider for channel binding for :mod:`ssl`. :param connection: the SSL connection :type connection: :class:`ssl.SSLSocket` :param type_: the channel binding type :type type_: :class:`str` """ def __init__( self, connection: ssl.SSLSocket, type_: str): super().__init__() self._connection = connection self._type = type_ @property def cb_name(self) -> bytes: return self._type.encode("us-ascii") def extract_cb_data(self) -> bytes: return self._connection.get_channel_binding(self._type) # type:ignore
[docs]class TLSUnique(ChannelBindingProvider): """ Provider for the channel binding ``tls-unique`` as specified by :rfc:`5929` for :mod:`OpenSSL`. .. warning:: This only supports connections that were not created by session resumption. :param connection: the SSL connection :type connection: :class:`OpenSSL.SSL.Connection` """ def __init__(self, connection: "OpenSSL.SSL.Connection"): super().__init__() self._connection = connection @property def cb_name(self) -> bytes: return b"tls-unique" def extract_cb_data(self) -> bytes: return self._connection.get_finished()
def parse_openssl_digest( digest: bytes, ) -> bytes: return bytes(map(functools.partial(int, base=16), digest.split(b":")))
[docs]class TLSServerEndPoint(ChannelBindingProvider): """ Provider for the channel binding ``tls-server-end-point`` as specified by :rfc:`5929` for :mod:`OpenSSL`. :param connection: the SSL connection :type connection: :class:`OpenSSL.SSL.Connection` """ def __init__( self, connection: "OpenSSL.SSL.Connection"): super().__init__() self._connection = connection @property def cb_name(self) -> bytes: return b"tls-server-end-point" def extract_cb_data(self) -> bytes: cert = self._connection.get_peer_certificate() algo, part, _ = cert.get_signature_algorithm().lower().partition( b"with") if not part: raise NotImplementedError if algo in (b"sha1", b"md5"): algo = b"sha256" return parse_openssl_digest(cert.digest(algo.decode("us-ascii")))