"""
:mod:`~aioxmpp.stream_xsos` --- Non-stanza stream-level XSOs
############################################################
This module contains XSO models for stream-level elements which are not
stanzas.
General XSOs
============
.. autoclass:: StreamError()
.. autoclass:: StreamFeatures()
Stream management related XSOs
==============================
.. autoclass:: SMXSO()
.. autoclass:: SMRequest()
.. autoclass:: SMAcknowledgement()
.. autoclass:: SMEnable()
.. autoclass:: SMEnabled()
.. autoclass:: SMResume()
.. autoclass:: SMResumed()
"""
import itertools
from . import xso, errors, stanza
from .utils import namespaces
[docs]class StreamError(xso.XSO):
"""
XSO representing a stream error.
.. attribute:: text
The text content of the stream error.
.. attribute:: condition
The RFC 6120 stream error condition.
"""
TAG = (namespaces.xmlstream, "error")
text = xso.ChildText(
tag=(namespaces.streams, "text"),
attr_policy=xso.UnknownAttrPolicy.DROP,
default=None,
declare_prefix=None)
condition = xso.ChildTag(
tags=[
"bad-format",
"bad-namespace-prefix",
"conflict",
"connection-timeout",
"host-gone",
"host-unknown",
"improper-addressing",
"internal-server-error",
"invalid-from",
"invalid-namespace",
"invalid-xml",
"not-authorized",
"not-well-formed",
"policy-violation",
"remote-connection-failed",
"reset",
"resource-constraint",
"restricted-xml",
"see-other-host",
"system-shutdown",
"undefined-condition",
"unsupported-encoding",
"unsupported-feature",
"unsupported-stanza-type",
"unsupported-version",
],
default_ns=namespaces.streams,
allow_none=False,
declare_prefix=None,
)
def __init__(self,
condition=(namespaces.streams, "undefined-condition"),
text=None):
super().__init__()
self.condition = condition
self.text = text
@classmethod
def from_exception(cls, exc):
instance = cls()
instance.text = exc.text
instance.condition = exc.condition
return instance
def to_exception(self):
return errors.StreamError(
condition=self.condition,
text=self.text)
[docs]class StreamFeatures(xso.XSO):
"""
XSO for collecting the supported stream features the remote advertises.
To register a stream feature, use :meth:`register_child` with the
:attr:`features` descriptor. A more fancy way to do the same thing is to
use the :meth:`as_feature_class` classmethod as decorator for your feature
XSO class.
Adding new feature classes:
.. automethod:: as_feature_class
Querying features:
.. method:: stream_features[FeatureClass]
Obtain the first feature XSO which matches the `FeatureClass`. If no
such XSO is contained in the :class:`StreamFeatures` instance
`stream_features`, :class:`KeyError` is raised.
.. method:: stream_features[FeatureClass] = feature
Replace the stream features belonging to the given `FeatureClass` with
the `feature` XSO.
If the `FeatureClass` does not match the type of the `feature` XSO, a
:class:`TypeError` is raised.
It is legal to leave the FeatureClass out by specifying ``...``
instead. In that case, the class is auto-detected from the `feature`
object assigned.
.. method:: del stream_features[FeatureClass]
If any feature of the given `FeatureClass` type is in the
`stream_features`, they are all removed.
Otherwise, :class:`KeyError` is raised, to stay consistent with other
mapping-like types.
.. automethod:: get_feature
.. automethod:: has_feature
"""
TAG = (namespaces.xmlstream, "features")
DECLARE_NS = {
None: namespaces.xmlstream
}
# we drop unknown children
UNKNOWN_CHILD_POLICY = xso.UnknownChildPolicy.DROP
features = xso.ChildMap([])
cruft = xso.Collector()
@classmethod
[docs] def as_feature_class(cls, other_cls):
cls.register_child(cls.features, other_cls)
return other_cls
def __getitem__(self, feature_cls):
tag = feature_cls.TAG
try:
return self.features[feature_cls.TAG][0]
except IndexError:
raise KeyError(feature_cls) from None
def __setitem__(self, feature_cls, feature):
if feature_cls is Ellipsis:
feature_cls = type(feature)
if not isinstance(feature, feature_cls):
raise ValueError("incorrect XSO class supplied")
self.features[feature_cls.TAG][:] = [feature]
def __delitem__(self, feature_cls):
items = self.features[feature_cls.TAG]
if not items:
raise KeyError(feature_cls)
items.clear()
def __contains__(self, other):
raise TypeError("membership test not supported")
[docs] def has_feature(self, feature_cls):
"""
Return :data:`True` if the stream features contain a feature of the
given `feature_cls` type. :data:`False` is returned otherwise.
"""
return feature_cls.TAG in self.features
[docs] def get_feature(self, feature_cls, default=None):
"""
If a feature of the given `feature_cls` type is contained in the
current stream features set, the first such instance is returned.
Otherwise, `default` is returned.
"""
try:
return self[feature_cls]
except KeyError:
return default
def __iter__(self):
return itertools.chain(*self.features.values())
@StreamFeatures.as_feature_class
class StreamManagementFeature(xso.XSO):
"""
Stream management stream feature
"""
TAG = (namespaces.stream_management, "sm")
class Required(xso.XSO):
TAG = (namespaces.stream_management, "required")
class Optional(xso.XSO):
TAG = (namespaces.stream_management, "optional")
required = xso.Child([Required])
optional = xso.Child([Optional])
[docs]class SMXSO(xso.XSO):
"""
Base class for stream-management related XSOs.
This base class merely defines the namespaces to declare when serializing
the data.
"""
DECLARE_NS = {
None: namespaces.stream_management
}
[docs]class SMRequest(SMXSO):
"""
A request for an SM acknowledgement (see :class:`SMAcknowledgement`).
"""
TAG = (namespaces.stream_management, "r")
[docs]class SMAcknowledgement(SMXSO):
"""
Response to a :class:`SMRequest`.
.. attribute:: counter
The counter as received by the remote side.
"""
TAG = (namespaces.stream_management, "a")
counter = xso.Attr(
"h",
type_=xso.Integer()
)
def __init__(self, counter=0, **kwargs):
super().__init__(**kwargs)
self.counter = counter
[docs]class SMEnable(SMXSO):
"""
Request to enable stream management.
.. attribute:: resume
Set this to :data:`True` to request the capability of resuming the
stream later.
"""
TAG = (namespaces.stream_management, "enable")
resume = xso.Attr(
"resume",
type_=xso.Bool(),
default=False
)
def __init__(self, resume=False):
super().__init__()
self.resume = resume
[docs]class SMEnabled(SMXSO):
"""
Response to a :class:`SMEnable` request.
.. attribute:: resume
If :data:`True`, the peer allows resumption of the stream.
.. attribute:: id_
The SM-ID of the stream. This is required to resume later.
.. attribute:: location
A hostname-port pair which defines to which host the client shall
connect to resume the stream.
"""
TAG = (namespaces.stream_management, "enabled")
resume = xso.Attr(
"resume",
type_=xso.Bool(),
default=False
)
id_ = xso.Attr("id")
location = xso.Attr(
"location",
type_=xso.ConnectionLocation(),
default=None)
max_ = xso.Attr(
"max",
type_=xso.Integer(),
default=None)
def __init__(self,
resume=False,
id_=None,
location=None,
max_=None):
super().__init__()
self.resume = resume
self.id_ = id_
self.location = location
self.max_ = max_
[docs]class SMResume(SMXSO):
"""
Request resumption of a previously interrupted SM stream.
.. attribute:: counter
Set this to the value of the local incoming stanza counter.
.. attribute:: previd
Set this to the SM-ID, as received in :class:`SMEnabled`.
"""
TAG = (namespaces.stream_management, "resume")
counter = xso.Attr(
"h",
type_=xso.Integer()
)
previd = xso.Attr("previd")
def __init__(self, counter, previd):
super().__init__()
self.counter = counter
self.previd = previd
[docs]class SMResumed(SMXSO):
"""
Notification that SM resumption was successful, in response to
:class:`SMResume`.
.. attribute:: counter
The stanza counter of the remote side.
.. attribute:: previd
The SM-ID of the stream.
"""
TAG = (namespaces.stream_management, "resumed")
counter = xso.Attr(
"h",
type_=xso.Integer())
previd = xso.Attr("previd")
def __init__(self, counter, previd):
super().__init__()
self.counter = counter
self.previd = previd
class SMFailed(SMXSO):
"""
Server response to :class:`SMEnable` or :class:`SMResume` if stream
management fails.
"""
TAG = (namespaces.stream_management, "failed")
condition = xso.ChildTag(
tags=stanza.STANZA_ERROR_TAGS,
default_ns=namespaces.stanzas,
allow_none=False,
declare_prefix=None,
)
def __init__(self,
condition=(namespaces.stanzas, "undefined-condition"),
**kwargs):
super().__init__(**kwargs)
if condition is not None:
self.condition = condition