Source code for aioxmpp.muc.xso

import aioxmpp.forms
import aioxmpp.stanza
import aioxmpp.stringprep
import aioxmpp.xso as xso

from aioxmpp.utils import namespaces


namespaces.xep0045_muc = "http://jabber.org/protocol/muc"
namespaces.xep0045_muc_user = "http://jabber.org/protocol/muc#user"
namespaces.xep0045_muc_admin = "http://jabber.org/protocol/muc#admin"
namespaces.xep0045_muc_owner = "http://jabber.org/protocol/muc#owner"


[docs]class History(xso.XSO): TAG = (namespaces.xep0045_muc, "history") maxchars = xso.Attr( "maxchars", type_=xso.Integer(), default=None, ) maxstanzas = xso.Attr( "maxstanzas", type_=xso.Integer(), default=None, ) seconds = xso.Attr( "seconds", type_=xso.Integer(), default=None, ) since = xso.Attr( "since", type_=xso.DateTime(), default=None, ) def __init__(self, *, maxchars=None, maxstanzas=None, seconds=None, since=None): super().__init__() self.maxchars = maxchars self.maxstanzas = maxstanzas self.seconds = seconds self.since = since
[docs]class GenericExt(xso.XSO): TAG = (namespaces.xep0045_muc, "x") history = xso.Child([History]) password = xso.ChildText( (namespaces.xep0045_muc, "password"), default=None )
aioxmpp.stanza.Presence.xep0045_muc = xso.Child([ GenericExt ]) aioxmpp.stanza.Message.xep0045_muc = xso.Child([ GenericExt ])
[docs]class Status(xso.XSO): TAG = (namespaces.xep0045_muc_user, "status") code = xso.Attr( "code", type_=xso.Integer() ) def __init__(self, code): super().__init__() self.code = code
class StatusCodeList(xso.AbstractType): def parse(self, item): return item.code def format(self, code): item = Status(code) return item def get_formatted_type(self): return Status
[docs]class DestroyNotification(xso.XSO): TAG = (namespaces.xep0045_muc_user, "destroy") reason = xso.ChildText( (namespaces.xep0045_muc_user, "reason"), default=None ) jid = xso.Attr( "jid", type_=xso.JID(), default=None )
[docs]class Decline(xso.XSO): TAG = (namespaces.xep0045_muc_user, "decline") from_ = xso.Attr( "from", type_=xso.JID(), default=None ) to = xso.Attr( "to", type_=xso.JID(), default=None ) reason = xso.ChildText( (namespaces.xep0045_muc_user, "reason"), default=None )
[docs]class Invite(xso.XSO): TAG = (namespaces.xep0045_muc_user, "invite") from_ = xso.Attr( "from", type_=xso.JID(), default=None ) to = xso.Attr( "to", type_=xso.JID(), default=None ) reason = xso.ChildText( (namespaces.xep0045_muc_user, "reason"), default=None )
class ActorBase(xso.XSO): jid = xso.Attr( "jid", type_=xso.JID(), default=None, ) nick = xso.Attr( "nick", type_=xso.String(aioxmpp.stringprep.resourceprep), default=None ) class ItemBase(xso.XSO): affiliation = xso.Attr( "affiliation", validator=xso.RestrictToSet({ "admin", "member", "none", "outcast", "owner", None, }), validate=xso.ValidateMode.ALWAYS, default=None, ) jid = xso.Attr( "jid", type_=xso.JID(), default=None, ) nick = xso.Attr( "nick", type_=xso.String(aioxmpp.stringprep.resourceprep), default=None ) role = xso.Attr( "role", validator=xso.RestrictToSet({ "moderator", "none", "participant", "visitor", None, }), validate=xso.ValidateMode.ALWAYS, default=None, ) def __init__(self, affiliation=None, jid=None, nick=None, role=None, reason=None): super().__init__() self.affiliation = affiliation self.jid = jid self.nick = nick self.role = role self.reason = reason
[docs]class UserActor(ActorBase): TAG = (namespaces.xep0045_muc_user, "actor")
[docs]class Continue(xso.XSO): TAG = (namespaces.xep0045_muc_user, "continue") thread = xso.Attr( "thread", type_=aioxmpp.stanza.Thread.identifier.type_, default=None, )
[docs]class UserItem(ItemBase): TAG = (namespaces.xep0045_muc_user, "item") actor = xso.Child([UserActor]) continue_ = xso.Child([Continue]) reason = xso.ChildText( (namespaces.xep0045_muc_user, "reason"), default=None )
[docs]class UserExt(xso.XSO): TAG = (namespaces.xep0045_muc_user, "x") status_codes = xso.ChildValueList( StatusCodeList(), container_type=set ) destroy = xso.Child([DestroyNotification]) decline = xso.Child([Decline]) invites = xso.ChildList([Invite]) items = xso.ChildList([UserItem]) password = xso.ChildText( (namespaces.xep0045_muc_user, "password"), default=None ) def __init__(self, status_codes=[], destroy=None, decline=None, invites=[], items=[], password=None): super().__init__() self.status_codes.update(status_codes) self.destroy = destroy self.decline = decline self.invites.extend(invites) self.items.extend(items) self.password = password
aioxmpp.stanza.Presence.xep0045_muc_user = xso.Child([ UserExt ]) aioxmpp.stanza.Message.xep0045_muc_user = xso.Child([ UserExt ])
[docs]class AdminActor(ActorBase): TAG = (namespaces.xep0045_muc_admin, "actor")
[docs]class AdminItem(ItemBase): TAG = (namespaces.xep0045_muc_admin, "item") actor = xso.Child([AdminActor]) continue_ = xso.Child([Continue]) reason = xso.ChildText( (namespaces.xep0045_muc_admin, "reason"), default=None )
@aioxmpp.stanza.IQ.as_payload_class
[docs]class AdminQuery(xso.XSO): TAG = (namespaces.xep0045_muc_admin, "query") items = xso.ChildList([AdminItem]) def __init__(self, *, items=[]): super().__init__() self.items[:] = items
[docs]class DestroyRequest(xso.XSO): TAG = (namespaces.xep0045_muc_owner, "destroy") reason = xso.ChildText( (namespaces.xep0045_muc_owner, "reason"), default=None ) password = xso.ChildText( (namespaces.xep0045_muc_owner, "password"), default=None ) jid = xso.Attr( "jid", type_=xso.JID(), default=None )
@aioxmpp.stanza.IQ.as_payload_class
[docs]class OwnerQuery(xso.XSO): TAG = (namespaces.xep0045_muc_owner, "query") destroy = xso.Child([DestroyRequest]) form = xso.Child([aioxmpp.forms.Data])