Source code for aioxmpp.structs

"""
:mod:`~aioxmpp.structs` --- Simple data holders for common data types
#####################################################################

These classes provide a way to hold structured data which is commonly
encountered in the XMPP realm.

Jabber IDs
==========

.. autoclass:: JID(localpart, domain, resource)

Presence
========

.. autoclass:: PresenceState

Languages
=========

.. autoclass:: LanguageTag

.. autoclass:: LanguageRange

.. autoclass:: LanguageMap

Functions for working with language tags
----------------------------------------

.. autofunction:: basic_filter_languages

.. autofunction:: lookup_language

"""

import collections
import functools

from .stringprep import nodeprep, resourceprep, nameprep


[docs]class JID(collections.namedtuple("JID", ["localpart", "domain", "resource"])): """ A Jabber ID (JID). To construct a JID, either use the actual constructor, or use the :meth:`fromstr` class method. If `strict` is false, unassigned codepoints are allowed in any of the parts of the JID. In the future, other deviations from the respective stringprep profiles may be allowed, too. The idea is to use non-`strict` when output is received from outside and when it is reflected, following the old principle "be conservative in what you send and liberal in what you receive". Otherwise, strict checking should be enabled. This brings maximum interoperability. .. automethod:: fromstr Information about a JID: .. attribute:: localpart The localpart, stringprep’d from the argument to the constructor. .. attribute:: domain The domain, stringprep’d from the argument to the constructor. .. attribute:: resource The resource, stringprep’d from the argument to the constructor. .. autoattribute:: is_bare .. autoattribute:: is_domain :class:`JID` objects are immutable. To obtain a JID object with a changed property, use one of the following methods: .. automethod:: bare .. automethod:: replace(*, [localpart], [domain], [resource]) """ __slots__ = [] def __new__(cls, localpart, domain, resource, *, strict=True): if localpart: localpart = nodeprep( localpart, allow_unassigned=not strict ) if domain is not None: domain = nameprep( domain, allow_unassigned=not strict ) if resource: resource = resourceprep( resource, allow_unassigned=not strict ) if not domain: raise ValueError("domain must not be empty or None") if len(domain.encode("utf-8")) > 1023: raise ValueError("domain too long") if localpart is not None: if not localpart: raise ValueError("localpart must not be empty") if len(localpart.encode("utf-8")) > 1023: raise ValueError("localpart too long") if resource is not None: if not resource: raise ValueError("resource must not be empty") if len(resource.encode("utf-8")) > 1023: raise ValueError("resource too long") return super().__new__(cls, localpart, domain, resource)
[docs] def replace(self, **kwargs): """ Construct a new :class:`JID` object, using the values of the current JID. Use the arguments to override specific attributes on the new object. """ new_kwargs = {} strict = kwargs.pop("strict", True) try: localpart = kwargs.pop("localpart") except KeyError: pass else: if localpart: localpart = nodeprep( localpart, allow_unassigned=not strict ) new_kwargs["localpart"] = localpart try: domain = kwargs.pop("domain") except KeyError: pass else: if not domain: raise ValueError("domain must not be empty or None") new_kwargs["domain"] = nameprep( domain, allow_unassigned=not strict ) try: resource = kwargs.pop("resource") except KeyError: pass else: if resource: resource = resourceprep( resource, allow_unassigned=not strict ) new_kwargs["resource"] = resource if kwargs: raise TypeError("replace() got an unexpected keyword argument" " {!r}".format( next(iter(kwargs)))) return super()._replace(**new_kwargs)
def __str__(self): result = self.domain if self.localpart: result = self.localpart + "@" + result if self.resource: result += "/" + self.resource return result
[docs] def bare(self): """ Return the bare version of this JID as new :class:`JID` object. """ return self.replace(resource=None)
@property
[docs] def is_bare(self): """ :data:`True` if the JID is bare, i.e. has an empty :attr:`resource` part. """ return not self.resource
@property
[docs] def is_domain(self): """ :data:`True` if the JID is a domain, i.e. if both the :attr:`localpart` and the :attr:`resource` are empty. """ return not self.resource and not self.localpart
@classmethod
[docs] def fromstr(cls, s, *, strict=True): """ Obtain a :class:`JID` object by parsing a JID from the given string `s`. """ localpart, sep, domain = s.partition("@") if not sep: domain = localpart localpart = None domain, sep, resource = domain.partition("/") if not sep: resource = None return cls(localpart, domain, resource, strict=strict)
@functools.total_ordering
[docs]class PresenceState: """ Hold a presence state of an XMPP resource, as defined by the presence stanza semantics. `available` must be a boolean value, which defines whether the resource is available or not. If the resource is available, `show` may be set to one of ``"dnd"``, ``"xa"``, ``"away"``, :data:`None`, ``"chat"`` (it is a :class:`ValueError` to attempt to set `show` to a non-:data:`None` value if `available` is false). :class:`PresenceState` objects are ordered by their availability and by their show values. Non-availability sorts lower than availability, and for available presence states the order is in the order of valid values given for the `show` above. .. attribute:: available As per the argument to the constructor, converted to a :class:`bool`. .. attribute:: show As per the argument to the constructor. .. automethod:: apply_to_stanza .. automethod:: from_stanza :class:`PresenceState` objects are immutable. """ SHOW_VALUES = ["dnd", "xa", "away", None, "chat"] SHOW_VALUE_WEIGHT = { value: i for i, value in enumerate(SHOW_VALUES) } __slots__ = ["_available", "_show"] def __init__(self, available=False, show=None): super().__init__() if not available and show: raise ValueError("Unavailable state cannot have show value") if show not in PresenceState.SHOW_VALUES: raise ValueError("Not a valid show value") self._available = bool(available) self._show = show @property
[docs] def available(self): return self._available
@property
[docs] def show(self): return self._show
def __lt__(self, other): my_key = (self.available, PresenceState.SHOW_VALUE_WEIGHT[self.show]) other_key = (other.available, PresenceState.SHOW_VALUE_WEIGHT[other.show]) return my_key < other_key def __eq__(self, other): try: return (self.available == other.available and self.show == other.show) except AttributeError: return NotImplemented def __repr__(self): more = "" if self.available: if self.show: more = " available show={!r}".format(self.show) else: more = " available" return "<PresenceState{}>".format(more)
[docs] def apply_to_stanza(self, stanza_obj): """ Apply the properties of this :class:`PresenceState` to a :class:`~aioxmpp.stanza.Presence` `stanza_obj`. The :attr:`~aioxmpp.stanza.Presence.type_` and :attr:`~aioxmpp.stanza.Presence.show` attributes of the object will be modified to fit the values in this object. """ if self.available: stanza_obj.type_ = None else: stanza_obj.type_ = "unavailable" stanza_obj.show = self.show
@classmethod
[docs] def from_stanza(cls, stanza_obj, strict=False): """ Create and return a new :class:`PresenceState` object which inherits the presence state as advertised in the given :class:`~aioxmpp.stanza.Presence` stanza. If `strict` is :data:`True`, the value of `show` is strictly checked, that is, it is required to be :data:`None` if the stanza indicates an unavailable state. The default is not to check this. """ if stanza_obj.type_ != "unavailable" and stanza_obj.type_ is not None: raise ValueError("presence state stanza required") available = not stanza_obj.type_ if not strict: show = stanza_obj.show if available else None else: show = stanza_obj.show return cls(available=available, show=show)
@functools.total_ordering
[docs]class LanguageTag: """ Implementation of a language tag. This may be a fully RFC5646 compliant implementation some day, but for now it is only very simplistic stub. There is no input validation of any kind. :class:`LanguageTag` instances compare and hash case-insensitively. .. automethod:: fromstr .. autoattribute:: match_str .. autoattribute:: print_str """ __slots__ = ("_tag",) def __init__(self, *, tag=None): if not tag: raise ValueError("tag cannot be empty") self._tag = tag @property
[docs] def match_str(self): """ The string which is used for matching two lanugage tags. This is the lower-cased version of the :attr:`print_str`. """ return self._tag.lower()
@property
[docs] def print_str(self): """ The stringified language tag. """ return self._tag
@classmethod
[docs] def fromstr(cls, s): """ Create a language tag from the given string `s`. .. note:: This is a stub implementation which merely refers to the given string as the :attr:`print_str` and derives the :attr:`match_str` from that. """ return cls(tag=s)
def __str__(self): return self.print_str def __eq__(self, other): try: return self.match_str == other.match_str except AttributeError: return False def __lt__(self, other): return self.match_str < other.match_str def __le__(self, other): return self.match_str <= other.match_str def __hash__(self): return hash(self.match_str) def __repr__(self): return "<{}.{}.fromstr({!r})>".format( type(self).__module__, type(self).__qualname__, str(self))
[docs]class LanguageRange: """ Implementation of a language range. This may be a fully RFC4647 compliant implementation some day, but for now it is only very simplistic stub. There is no input validation of any kind. :class:`LanguageRange` instances compare and hash case-insensitively. .. automethod:: fromstr .. automethod:: strip_rightmost .. autoattribute:: match_str .. autoattribute:: print_str """ __slots__ = ("_tag",) def __init__(self, *, tag=None): if not tag: raise ValueError("range cannot be empty") self._tag = tag @property
[docs] def match_str(self): """ The string which is used for matching two lanugage tags. This is the lower-cased version of the :attr:`print_str`. """ return self._tag.lower()
@property
[docs] def print_str(self): """ The stringified language tag. """ return self._tag
@classmethod
[docs] def fromstr(cls, s): """ Create a language tag from the given string `s`. .. note:: This is a stub implementation which merely refers to the given string as the :attr:`print_str` and derives the :attr:`match_str` from that. """ if s == "*": return cls.WILDCARD return cls(tag=s)
def __str__(self): return self.print_str def __eq__(self, other): try: return self.match_str == other.match_str except AttributeError: return False def __hash__(self): return hash(self.match_str) def __repr__(self): return "<{}.{}.fromstr({!r})>".format( type(self).__module__, type(self).__qualname__, str(self))
[docs] def strip_rightmost(self): """ Strip the rightmost part of the language range. If the new rightmost part is a singleton or ``x`` (i.e. starts an extension or private use part), it is also stripped. Return the newly created :class:`LanguageRange`. """ parts = self.print_str.split("-") parts.pop() if parts and len(parts[-1]) == 1: parts.pop() return type(self).fromstr("-".join(parts))
LanguageRange.WILDCARD = LanguageRange(tag="*")
[docs]def basic_filter_languages(languages, ranges): """ Filter languages using the string-based basic filter algorithm described in RFC4647. `languages` must be a sequence of :class:`LanguageTag` instances which are to be filtered. `ranges` must be an iterable which represent the basic language ranges to filter with, in priority order. The language ranges must be given as :class:`LanguageRange` objects. Return an iterator of languages which matched any of the `ranges`. The sequence produced by the iterator is in match order and duplicate-free. The first range to match a language yields the language into the iterator, no other range can yield that language afterwards. """ if LanguageRange.WILDCARD in ranges: yield from languages return found = set() for language_range in ranges: range_str = language_range.match_str for language in languages: if language in found: continue match_str = language.match_str if match_str == range_str: yield language found.add(language) continue if len(range_str) < len(match_str): if (match_str[:len(range_str)] == range_str and match_str[len(range_str)] == "-"): yield language found.add(language) continue
[docs]def lookup_language(languages, ranges): """ Look up a single language in the sequence `languages` using the lookup mechansim described in RFC4647. If no match is found, :data:`None` is returned. Otherwise, the first matching language is returned. `languages` must be a sequence of :class:`LanguageTag` objects, while `ranges` must be an iterable of :class:`LanguageRange` objects. """ for language_range in ranges: while True: try: return next(iter(basic_filter_languages( languages, [language_range]))) except StopIteration: pass try: language_range = language_range.strip_rightmost() except ValueError: break
[docs]class LanguageMap(dict): """ A :class:`dict` subclass specialized for holding :class:`LanugageTag` instances as keys. In addition to the interface provided by :class:`dict`, instances of this class also have the following method: .. automethod:: lookup """
[docs] def lookup(self, language_ranges): """ Perform an RFC4647 language range lookup on the keys in the dictionary. `language_ranges` must be a sequence of :class:`LanguageRange` instances. Return the entry in the dictionary with a key as produced by `lookup_language`. If `lookup_language` does not find a match and the mapping contains an entry with key :data:`None`, that entry is returned, otherwise :class:`KeyError` is raised. """ keys = list(self.keys()) try: keys.remove(None) except ValueError: pass keys.sort() key = lookup_language(keys, language_ranges) return self[key]