"""
:mod:`~aioxmpp.structs` --- Simple data holders for common data types
#####################################################################
These classes provide a way to hold structured data which is commonly
encountered in the XMPP realm.
Jabber IDs
==========
.. autoclass:: JID(localpart, domain, resource)
Presence
========
.. autoclass:: PresenceState
Languages
=========
.. autoclass:: LanguageTag
.. autoclass:: LanguageRange
.. autoclass:: LanguageMap
Functions for working with language tags
----------------------------------------
.. autofunction:: basic_filter_languages
.. autofunction:: lookup_language
"""
import collections
import functools
from .stringprep import nodeprep, resourceprep, nameprep
[docs]class JID(collections.namedtuple("JID", ["localpart", "domain", "resource"])):
"""
A Jabber ID (JID). To construct a JID, either use the actual constructor,
or use the :meth:`fromstr` class method.
If `strict` is false, unassigned codepoints are allowed in any of the parts
of the JID. In the future, other deviations from the respective stringprep
profiles may be allowed, too.
The idea is to use non-`strict` when output is received from outside and
when it is reflected, following the old principle "be conservative in what
you send and liberal in what you receive". Otherwise, strict checking
should be enabled. This brings maximum interoperability.
.. automethod:: fromstr
Information about a JID:
.. attribute:: localpart
The localpart, stringprep’d from the argument to the constructor.
.. attribute:: domain
The domain, stringprep’d from the argument to the constructor.
.. attribute:: resource
The resource, stringprep’d from the argument to the constructor.
.. autoattribute:: is_bare
.. autoattribute:: is_domain
:class:`JID` objects are immutable. To obtain a JID object with a changed
property, use one of the following methods:
.. automethod:: bare
.. automethod:: replace(*, [localpart], [domain], [resource])
"""
__slots__ = []
def __new__(cls, localpart, domain, resource, *, strict=True):
if localpart:
localpart = nodeprep(
localpart,
allow_unassigned=not strict
)
if domain is not None:
domain = nameprep(
domain,
allow_unassigned=not strict
)
if resource:
resource = resourceprep(
resource,
allow_unassigned=not strict
)
if not domain:
raise ValueError("domain must not be empty or None")
if len(domain.encode("utf-8")) > 1023:
raise ValueError("domain too long")
if localpart is not None:
if not localpart:
raise ValueError("localpart must not be empty")
if len(localpart.encode("utf-8")) > 1023:
raise ValueError("localpart too long")
if resource is not None:
if not resource:
raise ValueError("resource must not be empty")
if len(resource.encode("utf-8")) > 1023:
raise ValueError("resource too long")
return super().__new__(cls, localpart, domain, resource)
[docs] def replace(self, **kwargs):
"""
Construct a new :class:`JID` object, using the values of the current
JID. Use the arguments to override specific attributes on the new
object.
"""
new_kwargs = {}
strict = kwargs.pop("strict", True)
try:
localpart = kwargs.pop("localpart")
except KeyError:
pass
else:
if localpart:
localpart = nodeprep(
localpart,
allow_unassigned=not strict
)
new_kwargs["localpart"] = localpart
try:
domain = kwargs.pop("domain")
except KeyError:
pass
else:
if not domain:
raise ValueError("domain must not be empty or None")
new_kwargs["domain"] = nameprep(
domain,
allow_unassigned=not strict
)
try:
resource = kwargs.pop("resource")
except KeyError:
pass
else:
if resource:
resource = resourceprep(
resource,
allow_unassigned=not strict
)
new_kwargs["resource"] = resource
if kwargs:
raise TypeError("replace() got an unexpected keyword argument"
" {!r}".format(
next(iter(kwargs))))
return super()._replace(**new_kwargs)
def __str__(self):
result = self.domain
if self.localpart:
result = self.localpart + "@" + result
if self.resource:
result += "/" + self.resource
return result
[docs] def bare(self):
"""
Return the bare version of this JID as new :class:`JID` object.
"""
return self.replace(resource=None)
@property
[docs] def is_bare(self):
"""
:data:`True` if the JID is bare, i.e. has an empty :attr:`resource`
part.
"""
return not self.resource
@property
[docs] def is_domain(self):
"""
:data:`True` if the JID is a domain, i.e. if both the :attr:`localpart`
and the :attr:`resource` are empty.
"""
return not self.resource and not self.localpart
@classmethod
[docs] def fromstr(cls, s, *, strict=True):
"""
Obtain a :class:`JID` object by parsing a JID from the given string
`s`.
"""
localpart, sep, domain = s.partition("@")
if not sep:
domain = localpart
localpart = None
domain, sep, resource = domain.partition("/")
if not sep:
resource = None
return cls(localpart, domain, resource, strict=strict)
@functools.total_ordering
[docs]class PresenceState:
"""
Hold a presence state of an XMPP resource, as defined by the presence
stanza semantics.
`available` must be a boolean value, which defines whether the resource is
available or not. If the resource is available, `show` may be set to one of
``"dnd"``, ``"xa"``, ``"away"``, :data:`None`, ``"chat"`` (it is a
:class:`ValueError` to attempt to set `show` to a non-:data:`None` value if
`available` is false).
:class:`PresenceState` objects are ordered by their availability and by
their show values. Non-availability sorts lower than availability, and for
available presence states the order is in the order of valid values given
for the `show` above.
.. attribute:: available
As per the argument to the constructor, converted to a :class:`bool`.
.. attribute:: show
As per the argument to the constructor.
.. automethod:: apply_to_stanza
.. automethod:: from_stanza
:class:`PresenceState` objects are immutable.
"""
SHOW_VALUES = ["dnd", "xa", "away", None, "chat"]
SHOW_VALUE_WEIGHT = {
value: i
for i, value in enumerate(SHOW_VALUES)
}
__slots__ = ["_available", "_show"]
def __init__(self, available=False, show=None):
super().__init__()
if not available and show:
raise ValueError("Unavailable state cannot have show value")
if show not in PresenceState.SHOW_VALUES:
raise ValueError("Not a valid show value")
self._available = bool(available)
self._show = show
@property
[docs] def available(self):
return self._available
@property
[docs] def show(self):
return self._show
def __lt__(self, other):
my_key = (self.available,
PresenceState.SHOW_VALUE_WEIGHT[self.show])
other_key = (other.available,
PresenceState.SHOW_VALUE_WEIGHT[other.show])
return my_key < other_key
def __eq__(self, other):
try:
return (self.available == other.available and
self.show == other.show)
except AttributeError:
return NotImplemented
def __repr__(self):
more = ""
if self.available:
if self.show:
more = " available show={!r}".format(self.show)
else:
more = " available"
return "<PresenceState{}>".format(more)
[docs] def apply_to_stanza(self, stanza_obj):
"""
Apply the properties of this :class:`PresenceState` to a
:class:`~aioxmpp.stanza.Presence` `stanza_obj`. The
:attr:`~aioxmpp.stanza.Presence.type_` and
:attr:`~aioxmpp.stanza.Presence.show` attributes of the object will be
modified to fit the values in this object.
"""
if self.available:
stanza_obj.type_ = None
else:
stanza_obj.type_ = "unavailable"
stanza_obj.show = self.show
@classmethod
[docs] def from_stanza(cls, stanza_obj, strict=False):
"""
Create and return a new :class:`PresenceState` object which inherits
the presence state as advertised in the given
:class:`~aioxmpp.stanza.Presence` stanza.
If `strict` is :data:`True`, the value of `show` is strictly checked,
that is, it is required to be :data:`None` if the stanza indicates an
unavailable state.
The default is not to check this.
"""
if stanza_obj.type_ != "unavailable" and stanza_obj.type_ is not None:
raise ValueError("presence state stanza required")
available = not stanza_obj.type_
if not strict:
show = stanza_obj.show if available else None
else:
show = stanza_obj.show
return cls(available=available, show=show)
@functools.total_ordering
[docs]class LanguageTag:
"""
Implementation of a language tag. This may be a fully RFC5646 compliant
implementation some day, but for now it is only very simplistic stub.
There is no input validation of any kind.
:class:`LanguageTag` instances compare and hash case-insensitively.
.. automethod:: fromstr
.. autoattribute:: match_str
.. autoattribute:: print_str
"""
__slots__ = ("_tag",)
def __init__(self, *, tag=None):
if not tag:
raise ValueError("tag cannot be empty")
self._tag = tag
@property
[docs] def match_str(self):
"""
The string which is used for matching two lanugage tags. This is the
lower-cased version of the :attr:`print_str`.
"""
return self._tag.lower()
@property
[docs] def print_str(self):
"""
The stringified language tag.
"""
return self._tag
@classmethod
[docs] def fromstr(cls, s):
"""
Create a language tag from the given string `s`.
.. note::
This is a stub implementation which merely refers to the given
string as the :attr:`print_str` and derives the :attr:`match_str`
from that.
"""
return cls(tag=s)
def __str__(self):
return self.print_str
def __eq__(self, other):
try:
return self.match_str == other.match_str
except AttributeError:
return False
def __lt__(self, other):
return self.match_str < other.match_str
def __le__(self, other):
return self.match_str <= other.match_str
def __hash__(self):
return hash(self.match_str)
def __repr__(self):
return "<{}.{}.fromstr({!r})>".format(
type(self).__module__,
type(self).__qualname__,
str(self))
[docs]class LanguageRange:
"""
Implementation of a language range. This may be a fully RFC4647 compliant
implementation some day, but for now it is only very simplistic stub.
There is no input validation of any kind.
:class:`LanguageRange` instances compare and hash case-insensitively.
.. automethod:: fromstr
.. automethod:: strip_rightmost
.. autoattribute:: match_str
.. autoattribute:: print_str
"""
__slots__ = ("_tag",)
def __init__(self, *, tag=None):
if not tag:
raise ValueError("range cannot be empty")
self._tag = tag
@property
[docs] def match_str(self):
"""
The string which is used for matching two lanugage tags. This is the
lower-cased version of the :attr:`print_str`.
"""
return self._tag.lower()
@property
[docs] def print_str(self):
"""
The stringified language tag.
"""
return self._tag
@classmethod
[docs] def fromstr(cls, s):
"""
Create a language tag from the given string `s`.
.. note::
This is a stub implementation which merely refers to the given
string as the :attr:`print_str` and derives the :attr:`match_str`
from that.
"""
if s == "*":
return cls.WILDCARD
return cls(tag=s)
def __str__(self):
return self.print_str
def __eq__(self, other):
try:
return self.match_str == other.match_str
except AttributeError:
return False
def __hash__(self):
return hash(self.match_str)
def __repr__(self):
return "<{}.{}.fromstr({!r})>".format(
type(self).__module__,
type(self).__qualname__,
str(self))
[docs] def strip_rightmost(self):
"""
Strip the rightmost part of the language range. If the new rightmost
part is a singleton or ``x`` (i.e. starts an extension or private use
part), it is also stripped.
Return the newly created :class:`LanguageRange`.
"""
parts = self.print_str.split("-")
parts.pop()
if parts and len(parts[-1]) == 1:
parts.pop()
return type(self).fromstr("-".join(parts))
LanguageRange.WILDCARD = LanguageRange(tag="*")
[docs]def basic_filter_languages(languages, ranges):
"""
Filter languages using the string-based basic filter algorithm described in
RFC4647.
`languages` must be a sequence of :class:`LanguageTag` instances which are
to be filtered.
`ranges` must be an iterable which represent the basic language ranges to
filter with, in priority order. The language ranges must be given as
:class:`LanguageRange` objects.
Return an iterator of languages which matched any of the `ranges`. The
sequence produced by the iterator is in match order and duplicate-free. The
first range to match a language yields the language into the iterator, no
other range can yield that language afterwards.
"""
if LanguageRange.WILDCARD in ranges:
yield from languages
return
found = set()
for language_range in ranges:
range_str = language_range.match_str
for language in languages:
if language in found:
continue
match_str = language.match_str
if match_str == range_str:
yield language
found.add(language)
continue
if len(range_str) < len(match_str):
if (match_str[:len(range_str)] == range_str and
match_str[len(range_str)] == "-"):
yield language
found.add(language)
continue
[docs]def lookup_language(languages, ranges):
"""
Look up a single language in the sequence `languages` using the lookup
mechansim described in RFC4647. If no match is found, :data:`None` is
returned. Otherwise, the first matching language is returned.
`languages` must be a sequence of :class:`LanguageTag` objects, while
`ranges` must be an iterable of :class:`LanguageRange` objects.
"""
for language_range in ranges:
while True:
try:
return next(iter(basic_filter_languages(
languages,
[language_range])))
except StopIteration:
pass
try:
language_range = language_range.strip_rightmost()
except ValueError:
break
[docs]class LanguageMap(dict):
"""
A :class:`dict` subclass specialized for holding :class:`LanugageTag`
instances as keys.
In addition to the interface provided by :class:`dict`, instances of this
class also have the following method:
.. automethod:: lookup
"""
[docs] def lookup(self, language_ranges):
"""
Perform an RFC4647 language range lookup on the keys in the
dictionary. `language_ranges` must be a sequence of
:class:`LanguageRange` instances.
Return the entry in the dictionary with a key as produced by
`lookup_language`. If `lookup_language` does not find a match and the
mapping contains an entry with key :data:`None`, that entry is
returned, otherwise :class:`KeyError` is raised.
"""
keys = list(self.keys())
try:
keys.remove(None)
except ValueError:
pass
keys.sort()
key = lookup_language(keys, language_ranges)
return self[key]