Source code for aioxmpp.structs

"""
:mod:`~aioxmpp.structs` --- Simple data holders for common data types
#####################################################################

These classes provide a way to hold structured data which is commonly
encountered in the XMPP realm.

Jabber IDs
==========

.. autoclass:: JID(localpart, domain, resource)

Presence
========

.. autoclass:: PresenceState

Languages
=========

.. autoclass:: LanguageTag

.. autoclass:: LanguageRange

.. autoclass:: LanguageMap

Functions for working with language tags
----------------------------------------

.. autofunction:: basic_filter_languages

.. autofunction:: lookup_language

"""

import collections
import functools

from .stringprep import nodeprep, resourceprep, nameprep


[docs]class JID(collections.namedtuple("JID", ["localpart", "domain", "resource"])): """ A Jabber ID (JID). To construct a JID, either use the actual constructor, or use the :meth:`fromstr` class method. .. automethod:: fromstr Information about a JID: .. attribute:: localpart The localpart, stringprep’d from the argument to the constructor. .. attribute:: domain The domain, stringprep’d from the argument to the constructor. .. attribute:: resource The resource, stringprep’d from the argument to the constructor. .. autoattribute:: is_bare .. autoattribute:: is_domain :class:`JID` objects are immutable. To obtain a JID object with a changed property, use one of the following methods: .. automethod:: bare .. automethod:: replace(*, [localpart], [domain], [resource]) """ __slots__ = [] def __new__(cls, localpart, domain, resource): if localpart: localpart = nodeprep(localpart) if domain is not None: domain = nameprep(domain) if resource: resource = resourceprep(resource) if not domain: raise ValueError("domain must not be empty or None") if len(domain.encode("utf-8")) > 1023: raise ValueError("domain too long") if localpart is not None: if not localpart: raise ValueError("localpart must not be empty") if len(localpart.encode("utf-8")) > 1023: raise ValueError("localpart too long") if resource is not None: if not resource: raise ValueError("resource must not be empty") if len(resource.encode("utf-8")) > 1023: raise ValueError("resource too long") return super().__new__(cls, localpart, domain, resource)
[docs] def replace(self, **kwargs): """ Construct a new :class:`JID` object, using the values of the current JID. Use the arguments to override specific attributes on the new object. """ new_kwargs = {} try: localpart = kwargs.pop("localpart") except KeyError: pass else: if localpart: localpart = nodeprep(localpart) new_kwargs["localpart"] = localpart try: domain = kwargs.pop("domain") except KeyError: pass else: if not domain: raise ValueError("domain must not be empty or None") new_kwargs["domain"] = nameprep(domain) try: resource = kwargs.pop("resource") except KeyError: pass else: if resource: resource = resourceprep(resource) new_kwargs["resource"] = resource if kwargs: raise TypeError("replace() got an unexpected keyword argument" " {!r}".format( next(iter(kwargs)))) return super()._replace(**new_kwargs)
def __str__(self): result = self.domain if self.localpart: result = self.localpart + "@" + result if self.resource: result += "/" + self.resource return result
[docs] def bare(self): """ Return the bare version of this JID as new :class:`JID` object. """ return self.replace(resource=None)
@property
[docs] def is_bare(self): """ :data:`True` if the JID is bare, i.e. has an empty :attr:`resource` part. """ return not self.resource
@property
[docs] def is_domain(self): """ :data:`True` if the JID is a domain, i.e. if both the :attr:`localpart` and the :attr:`resource` are empty. """ return not self.resource and not self.localpart
@classmethod
[docs] def fromstr(cls, s): """ Obtain a :class:`JID` object by parsing a JID from the given string `s`. """ localpart, sep, domain = s.partition("@") if not sep: domain = localpart localpart = None domain, sep, resource = domain.partition("/") if not sep: resource = None return cls(localpart, domain, resource)
@functools.total_ordering
[docs]class PresenceState: """ Hold a presence state of an XMPP resource, as defined by the presence stanza semantics. `available` must be a boolean value, which defines whether the resource is available or not. If the resource is available, `show` may be set to one of ``"dnd"``, ``"xa"``, ``"away"``, :data:`None`, ``"chat"`` (it is a :class:`ValueError` to attempt to set `show` to a non-:data:`None` value if `available` is false). :class:`PresenceState` objects are ordered by their availability and by their show values. Non-availability sorts lower than availability, and for available presence states the order is in the order of valid values given for the `show` above. .. attribute:: available As per the argument to the constructor, converted to a :class:`bool`. .. attribute:: show As per the argument to the constructor. .. automethod:: apply_to_stanza .. automethod:: from_stanza :class:`PresenceState` objects are immutable. """ SHOW_VALUES = ["dnd", "xa", "away", None, "chat"] SHOW_VALUE_WEIGHT = { value: i for i, value in enumerate(SHOW_VALUES) } __slots__ = ["_available", "_show"] def __init__(self, available=False, show=None): super().__init__() if not available and show: raise ValueError("Unavailable state cannot have show value") if show not in PresenceState.SHOW_VALUES: raise ValueError("Not a valid show value") self._available = bool(available) self._show = show @property
[docs] def available(self): return self._available
@property
[docs] def show(self): return self._show
def __lt__(self, other): my_key = (self.available, PresenceState.SHOW_VALUE_WEIGHT[self.show]) other_key = (other.available, PresenceState.SHOW_VALUE_WEIGHT[other.show]) return my_key < other_key def __eq__(self, other): try: return (self.available == other.available and self.show == other.show) except AttributeError: return NotImplemented def __repr__(self): more = "" if self.available: if self.show: more = " available show={!r}".format(self.show) else: more = " available" return "<PresenceState{}>".format(more)
[docs] def apply_to_stanza(self, stanza_obj): """ Apply the properties of this :class:`PresenceState` to a :class:`~aioxmpp.stanza.Presence` `stanza_obj`. The :attr:`~aioxmpp.stanza.Presence.type_` and :attr:`~aioxmpp.stanza.Presence.show` attributes of the object will be modified to fit the values in this object. """ if self.available: stanza_obj.type_ = None else: stanza_obj.type_ = "unavailable" stanza_obj.show = self.show
@classmethod
[docs] def from_stanza(cls, stanza_obj, strict=False): """ Create and return a new :class:`PresenceState` object which inherits the presence state as advertised in the given :class:`~aioxmpp.stanza.Presence` stanza. If `strict` is :data:`True`, the value of `show` is strictly checked, that is, it is required to be :data:`None` if the stanza indicates an unavailable state. The default is not to check this. """ if stanza_obj.type_ != "unavailable" and stanza_obj.type_ is not None: raise ValueError("presence state stanza required") available = not stanza_obj.type_ if not strict: show = stanza_obj.show if available else None else: show = stanza_obj.show return cls(available=available, show=show)
@functools.total_ordering
[docs]class LanguageTag: """ Implementation of a language tag. This may be a fully RFC5646 compliant implementation some day, but for now it is only very simplistic stub. There is no input validation of any kind. :class:`LanguageTag` instances compare and hash case-insensitively. .. automethod:: fromstr .. autoattribute:: match_str .. autoattribute:: print_str """ __slots__ = ("_tag",) def __init__(self, *, tag=None): if not tag: raise ValueError("tag cannot be empty") self._tag = tag @property
[docs] def match_str(self): """ The string which is used for matching two lanugage tags. This is the lower-cased version of the :attr:`print_str`. """ return self._tag.lower()
@property
[docs] def print_str(self): """ The stringified language tag. """ return self._tag
@classmethod
[docs] def fromstr(cls, s): """ Create a language tag from the given string `s`. .. note:: This is a stub implementation which merely refers to the given string as the :attr:`print_str` and derives the :attr:`match_str` from that. """ return cls(tag=s)
def __str__(self): return self.print_str def __eq__(self, other): try: return self.match_str == other.match_str except AttributeError: return False def __lt__(self, other): return self.match_str < other.match_str def __le__(self, other): return self.match_str <= other.match_str def __hash__(self): return hash(self.match_str) def __repr__(self): return "<{}.{}.fromstr({!r})>".format( type(self).__module__, type(self).__qualname__, str(self))
[docs]class LanguageRange: """ Implementation of a language range. This may be a fully RFC4647 compliant implementation some day, but for now it is only very simplistic stub. There is no input validation of any kind. :class:`LanguageRange` instances compare and hash case-insensitively. .. automethod:: fromstr .. automethod:: strip_rightmost .. autoattribute:: match_str .. autoattribute:: print_str """ __slots__ = ("_tag",) def __init__(self, *, tag=None): if not tag: raise ValueError("range cannot be empty") self._tag = tag @property
[docs] def match_str(self): """ The string which is used for matching two lanugage tags. This is the lower-cased version of the :attr:`print_str`. """ return self._tag.lower()
@property
[docs] def print_str(self): """ The stringified language tag. """ return self._tag
@classmethod
[docs] def fromstr(cls, s): """ Create a language tag from the given string `s`. .. note:: This is a stub implementation which merely refers to the given string as the :attr:`print_str` and derives the :attr:`match_str` from that. """ if s == "*": return cls.WILDCARD return cls(tag=s)
def __str__(self): return self.print_str def __eq__(self, other): try: return self.match_str == other.match_str except AttributeError: return False def __hash__(self): return hash(self.match_str) def __repr__(self): return "<{}.{}.fromstr({!r})>".format( type(self).__module__, type(self).__qualname__, str(self))
[docs] def strip_rightmost(self): """ Strip the rightmost part of the language range. If the new rightmost part is a singleton or ``x`` (i.e. starts an extension or private use part), it is also stripped. Return the newly created :class:`LanguageRange`. """ parts = self.print_str.split("-") parts.pop() if parts and len(parts[-1]) == 1: parts.pop() return type(self).fromstr("-".join(parts))
LanguageRange.WILDCARD = LanguageRange(tag="*")
[docs]def basic_filter_languages(languages, ranges): """ Filter languages using the string-based basic filter algorithm described in RFC4647. `languages` must be a sequence of :class:`LanguageTag` instances which are to be filtered. `ranges` must be an iterable which represent the basic language ranges to filter with, in priority order. The language ranges must be given as :class:`LanguageRange` objects. Return an iterator of languages which matched any of the `ranges`. The sequence produced by the iterator is in match order and duplicate-free. The first range to match a language yields the language into the iterator, no other range can yield that language afterwards. """ if LanguageRange.WILDCARD in ranges: yield from languages return found = set() for language_range in ranges: range_str = language_range.match_str for language in languages: if language in found: continue match_str = language.match_str if match_str == range_str: yield language found.add(language) continue if len(range_str) < len(match_str): if (match_str[:len(range_str)] == range_str and match_str[len(range_str)] == "-"): yield language found.add(language) continue
[docs]def lookup_language(languages, ranges): """ Look up a single language in the sequence `languages` using the lookup mechansim described in RFC4647. If no match is found, :data:`None` is returned. Otherwise, the first matching language is returned. `languages` must be a sequence of :class:`LanguageTag` objects, while `ranges` must be an iterable of :class:`LanguageRange` objects. """ for language_range in ranges: while True: try: return next(iter(basic_filter_languages( languages, [language_range]))) except StopIteration: pass try: language_range = language_range.strip_rightmost() except ValueError: break
[docs]class LanguageMap(dict): """ A :class:`dict` subclass specialized for holding :class:`LanugageTag` instances as keys. In addition to the interface provided by :class:`dict`, instances of this class also have the following method: .. automethod:: lookup """
[docs] def lookup(self, language_ranges): """ Perform an RFC4647 language range lookup on the keys in the dictionary. `language_ranges` must be a sequence of :class:`LanguageRange` instances. Return the entry in the dictionary with a key as produced by `lookup_language`. If `lookup_language` does not find a match and the mapping contains an entry with key :data:`None`, that entry is returned, otherwise :class:`KeyError` is raised. """ keys = list(self.keys()) try: keys.remove(None) except ValueError: pass keys.sort() key = lookup_language(keys, language_ranges) return self[key]

aioxmpp

Navigation