Source code for aioxmpp.xml

########################################################################
# File name: xml.py
# This file is part of: aioxmpp
#
# LICENSE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program.  If not, see
# <http://www.gnu.org/licenses/>.
#
########################################################################
"""
:mod:`~aioxmpp.xml` --- XML utilities and interfaces for handling XMPP XML streams
#######################################################################################

This module provides a few classes and functions which are useful when
generating and parsing XML streams for XMPP.

Generating XML streams
======================

The most useful class here is the :class:`XMPPXMLGenerator`:

.. autoclass:: XMPPXMLGenerator

.. autoclass:: XMLStreamWriter

Processing XML streams
======================

To convert streams of SAX events to :class:`~.stanza_model.XSO`
instances, the following classes and functions can be used:

.. autoclass:: XMPPXMLProcessor

.. autoclass:: XMPPLexicalHandler

.. autofunction:: make_parser

Utility functions
=================

.. autofunction:: serialize_single_xso

.. autofunction:: write_single_xso

.. autofunction:: read_xso

.. autofunction:: read_single_xso

"""  # NOQA: E501

import copy
import contextlib
import io

import xml.sax
import xml.sax.saxutils

from enum import Enum

from . import errors, structs, xso
from .utils import namespaces


_NAME_START_CHAR = [
    [ord(":"), ord("_")],
    range(ord("a"), ord("z")+1),
    range(ord("A"), ord("Z")+1),
    range(0xc0, 0xd7),
    range(0xd8, 0xf7),
    range(0xf8, 0x300),
    range(0x370, 0x37e),
    range(0x37f, 0x2000),
    range(0x200c, 0x200e),
    range(0x2070, 0x2190),
    range(0x2c00, 0x2ff0),
    range(0x3001, 0xd800),
    range(0xf900, 0xfdd0),
    range(0xfdf0, 0xfffe),
    range(0x10000, 0xf0000),
]

_NAME_CHAR = _NAME_START_CHAR + [
    [ord("-"), ord("."), 0xb7],
    range(ord("0"), ord("9")+1),
    range(0x0300, 0x0370),
    range(0x203f, 0x2041),
]
_NAME_CHAR.sort(key=lambda x: x[0])


def xmlValidateNameValue_str(s):
    if not s:
        return False
    ch = ord(s[0])
    if not any(ch in range_ for range_ in _NAME_START_CHAR):
        return False
    return all(
        any(ch in range_ for range_ in _NAME_CHAR)
        for ch in map(ord, s)
    )


def is_valid_cdata_str(s):
    for c in s:
        o = ord(c)
        if o >= 32:
            continue
        if o < 9 or 11 <= o <= 12 or 14 <= o <= 31:
            return False

    return True


[docs]class XMPPXMLGenerator: """ Class to generate XMPP-conforming XML bytes. :param out: File-like object to which the bytes are written. :param short_empty_elements: Write empty elements as ``<foo/>`` instead of ``<foo></foo>``. :type short_empty_elements: :class:`bool` :param sorted_attributes: Sort the attributes in the output. Note: this comes with a performance penalty. See below. :type sorted_attributes: :class:`bool` :param additional_escapes: Sequence of characters to escape in CDATA. :type additional_escapes: :class:`~collections.abc.Iterable` of 1-codepoint :class:`str` objects. :class:`XMPPXMLGenerator` works similar to :class:`xml.sax.saxutils.XMLGenerator`, but has a few key differences: * It supports **only** namespace-conforming XML documents * It automatically chooses namespace prefixes if a namespace has not been declared, while avoiding to use prefixes at all if possible * It is in general stricter on (explicit) namespace declarations, to avoid ambiguities * It always uses utf-8 ☺ * It allows explicit flushing `out` must be a file-like supporting both :meth:`file.write` and :meth:`file.flush`. If `short_empty_elements` is true, empty elements are rendered as ``<foo/>`` instead of ``<foo></foo>``, unless a flush occurs before the call to :meth:`endElementNS`, in which case the opening is finished before flushing, thus the long form is generated. If `sorted_attributes` is true, attributes are emitted in the lexical order of their qualified names (except for namespace declarations, which are always sorted and always before the normal attributes). The default is not to do this, for performance. During testing, however, it is useful to have a consistent oder on the attributes. All characters in `additional_escapes` are escaped using XML entities. Note that ``<``, ``>`` and ``&`` are always escaped. `additional_escapes` is converted to a dictionary for use with :func:`~xml.sax.saxutils.escape` and :func:`~xml.sax.saxutils.quoteattr`. Passing a dictionary to `additional_escapes` or passing multi-character strings as elements of `additional_escapes` is **not** supported since it may be (ab-)used to create invalid XMPP XML. `additional_escapes` affects both CDATA in XML elements as well as attribute values. Implementation of the SAX content handler interface (see :class:`xml.sax.handler.ContentHandler`): .. automethod:: startDocument .. automethod:: startPrefixMapping(prefix, uri) .. automethod:: startElementNS .. automethod:: characters .. automethod:: endElementNS .. automethod:: endPrefixMapping .. automethod:: endDocument The following SAX content handler methods have deliberately not been implemented: .. automethod:: setDocumentLocator .. automethod:: skippedEntity .. automethod:: ignorableWhitespace .. automethod:: startElement .. automethod:: endElement These methods produce content which is invalid in XMPP XML streams and thus always raise :class:`ValueError`: .. automethod:: processingInstruction In addition to the SAX content handler interface, the following methods are provided: .. automethod:: flush .. automethod:: buffer """ def __init__(self, out, short_empty_elements=True, sorted_attributes=False, additional_escapes=[]): self._write = out.write if hasattr(out, "flush"): self._flush = out.flush else: self._flush = None self._short_empty_elements = short_empty_elements self._sorted_attributes = sorted_attributes self._additional_escapes = { char: "&#{};".format(ord(char)) for char in additional_escapes } # NOTE: when adding state, make sure to handle it in buffer() and to # add tests that buffer() handles it correctly self._ns_map_stack = [({}, set(), 0)] self._curr_ns_map = {} self._pending_start_element = False self._ns_prefixes_floating_in = {} self._ns_prefixes_floating_out = set() self._ns_auto_prefixes_floating_in = set() self._ns_decls_floating_in = {} self._ns_counter = -1 # for buffer() self._buf = None self._buf_in_use = False def _roll_prefix(self, attr): if not attr and None not in self._ns_prefixes_floating_in: return None prefix_number = self._ns_counter + 1 while True: prefix = "ns{}".format(prefix_number) if prefix not in self._ns_prefixes_floating_in: break prefix_number += 1 self._ns_counter = prefix_number return prefix def _qname(self, name, attr=False): if not isinstance(name, tuple): raise ValueError("names must be tuples") if ":" in name[1] or not xmlValidateNameValue_str(name[1]): raise ValueError("invalid name: {!r}".format(name[1])) if name[0]: if name[0] == "http://www.w3.org/XML/1998/namespace": return "xml:" + name[1] try: prefix = self._ns_decls_floating_in[name[0]] if attr and prefix is None: raise KeyError() except KeyError: try: prefix = self._curr_ns_map[name[0]] if prefix in self._ns_prefixes_floating_in: raise KeyError() if attr and prefix is None: raise KeyError() except KeyError: # namespace is undeclared, we have to declare it.. prefix = self._roll_prefix(attr) self.startPrefixMapping(prefix, name[0], auto=True) if prefix: return ":".join((prefix, name[1])) elif (not attr and (None in self._curr_ns_map or None in self._ns_prefixes_floating_in)): raise ValueError("cannot create unnamespaced element when " "prefixless namespace is bound") return name[1] def _finish_pending_start_element(self): if not self._pending_start_element: return self._pending_start_element = False self._write(b">") def _pin_floating_ns_decls(self, old_counter): if self._ns_prefixes_floating_out: raise RuntimeError("namespace prefix has not been closed") new_decls = self._ns_decls_floating_in new_prefixes = self._ns_prefixes_floating_in old_ns_map = self._curr_ns_map self._ns_map_stack.append( ( old_ns_map, set(new_prefixes) - self._ns_auto_prefixes_floating_in, old_counter ) ) new_ns_map = dict(new_decls) cleared_new_prefixes = dict(new_prefixes) for uri, prefix in old_ns_map.items(): try: new_uri = new_prefixes[prefix] except KeyError: pass else: if new_uri != uri: # -> the entry must be dropped because the prefix is # re-assigned continue # use setdefault: new entries (as assigned in new_ns_map = # dict(...)) need to win over old entries new_ns_map.setdefault(uri, prefix) try: new_uri = cleared_new_prefixes[prefix] except KeyError: pass else: if new_uri == uri: del cleared_new_prefixes[prefix] self._curr_ns_map = new_ns_map self._ns_decls_floating_in = {} self._ns_prefixes_floating_in = {} self._ns_auto_prefixes_floating_in.clear() return cleared_new_prefixes
[docs] def startDocument(self): """ Start the document. This method *must* be called before any other content handler method. """ # yes, I know the doctext is not enforced. It might become enforced in # a later version though, when I find a compelling reason why it is # needed. self._write(b'<?xml version="1.0"?>')
[docs] def startPrefixMapping(self, prefix, uri, *, auto=False): """ Start a prefix mapping which maps the given `prefix` to the given `uri`. Note that prefix mappings are handled transactional. All announcements of prefix mappings are collected until the next call to :meth:`startElementNS`. At that point, the mappings are collected and start to override the previously declared mappings until the corresponding :meth:`endElementNS` call. Also note that calling :meth:`startPrefixMapping` is not mandatory; you can use any namespace you like at any time. If you use a namespace whose URI has not been associated with a prefix yet, a free prefix will automatically be chosen. To avoid unnecessary performance penalties, do not use prefixes of the form ``"ns{:d}".format(n)``, for any non-negative number of `n`. It is however required to call :meth:`endPrefixMapping` after a :meth:`endElementNS` call for all namespaces which have been announced directly before the :meth:`startElementNS` call (except for those which have been chosen automatically). Not doing so will result in a :class:`RuntimeError` at the next :meth:`startElementNS` or :meth:`endElementNS` call. During a transaction, it is not allowed to declare the same prefix multiple times. """ if (prefix is not None and (prefix == "xml" or prefix == "xmlns" or not xmlValidateNameValue_str(prefix) or ":" in prefix)): raise ValueError("not a valid prefix: {!r}".format(prefix)) if prefix in self._ns_prefixes_floating_in: raise ValueError("prefix already declared for next element") if auto: self._ns_auto_prefixes_floating_in.add(prefix) self._ns_prefixes_floating_in[prefix] = uri self._ns_decls_floating_in[uri] = prefix
[docs] def startElementNS(self, name, qname, attributes=None): """ Start a sub-element. `name` must be a tuple of ``(namespace_uri, localname)`` and `qname` is ignored. `attributes` must be a dictionary mapping attribute tag tuples (``(namespace_uri, attribute_name)``) to string values. To use unnamespaced attributes, `namespace_uri` can be false (e.g. :data:`None` or the empty string). To use unnamespaced elements, `namespace_uri` in `name` must be false **and** no namespace without prefix must be currently active. If a namespace without prefix is active and `namespace_uri` in `name` is false, :class:`ValueError` is raised. Attribute values are of course automatically escaped. """ self._finish_pending_start_element() old_counter = self._ns_counter qname = self._qname(name) if attributes: attrib = [ (self._qname(attrname, attr=True), value) for attrname, value in attributes.items() ] for attrqname, _ in attrib: if attrqname == "xmlns": raise ValueError("xmlns not allowed as attribute name") else: attrib = [] pending_prefixes = self._pin_floating_ns_decls(old_counter) self._write(b"<") self._write(qname.encode("utf-8")) if None in pending_prefixes: uri = pending_prefixes.pop(None) self._write(b" xmlns=") self._write(xml.sax.saxutils.quoteattr(uri).encode("utf-8")) for prefix, uri in sorted(pending_prefixes.items()): self._write(b" xmlns") if prefix: self._write(b":") self._write(prefix.encode("utf-8")) self._write(b"=") self._write( xml.sax.saxutils.quoteattr(uri).encode("utf-8") ) if self._sorted_attributes: attrib.sort() for attrname, value in attrib: self._write(b" ") self._write(attrname.encode("utf-8")) self._write(b"=") self._write( xml.sax.saxutils.quoteattr( value, self._additional_escapes, ).encode("utf-8") ) if self._short_empty_elements: self._pending_start_element = name else: self._write(b">")
[docs] def endElementNS(self, name, qname): """ End a previously started element. `name` must be a ``(namespace_uri, localname)`` tuple and `qname` is ignored. """ if self._ns_prefixes_floating_out: raise RuntimeError("namespace prefix has not been closed") if self._pending_start_element == name: self._pending_start_element = False self._write(b"/>") else: self._write(b"</") self._write(self._qname(name).encode("utf-8")) self._write(b">") self._curr_ns_map, self._ns_prefixes_floating_out, self._ns_counter = \ self._ns_map_stack.pop()
[docs] def endPrefixMapping(self, prefix): """ End a prefix mapping declared with :meth:`startPrefixMapping`. See there for more details. """ self._ns_prefixes_floating_out.remove(prefix)
[docs] def startElement(self, name, attributes=None): """ Not supported; only elements with proper namespacing are supported by this generator. """ raise NotImplementedError("namespace-incorrect documents are " "not supported")
[docs] def characters(self, chars): """ Put character data in the currently open element. Special characters (such as ``<``, ``>`` and ``&``) are escaped. If `chars` contains any ASCII control character, :class:`ValueError` is raised. """ self._finish_pending_start_element() if not is_valid_cdata_str(chars): raise ValueError("control characters are not allowed in " "well-formed XML") self._write(xml.sax.saxutils.escape( chars, self._additional_escapes, ).encode("utf-8"))
[docs] def processingInstruction(self, target, data): """ Not supported; explicitly forbidden in XMPP. Raises :class:`ValueError`. """ raise ValueError("restricted xml: processing instruction forbidden")
[docs] def skippedEntity(self, name): """ Not supported; there is no use case. Raises :class:`NotImplementedError`. """ raise NotImplementedError("skippedEntity")
[docs] def setDocumentLocator(self, locator): """ Not supported; there is no use case. Raises :class:`NotImplementedError`. """ raise NotImplementedError("setDocumentLocator")
[docs] def ignorableWhitespace(self, whitespace): """ Not supported; could be mapped to :meth:`characters`. """ raise NotImplementedError("ignorableWhitespace")
[docs] def endElement(self, name): """ Not supported; only elements with proper namespacing are supported by this generator. """ self.startElement(name)
[docs] def endDocument(self): """ This must be called at the end of the document. Note that this does not call :meth:`flush`. """
[docs] def flush(self): """ Call :meth:`flush` on the object passed to the `out` argument of the constructor. In addition, any unfinished opening tags are finished, which can lead to expansion of the generated XML code (see note on the `short_empty_elements` argument at the class documentation). """ self._finish_pending_start_element() if self._flush: self._flush()
@contextlib.contextmanager def _save_state(self): """ Helper context manager for :meth:`buffer` which saves the whole state. This is broken out in a separate method for readability and tested indirectly by testing :meth:`buffer`. """ ns_prefixes_floating_in = copy.copy(self._ns_prefixes_floating_in) ns_prefixes_floating_out = copy.copy(self._ns_prefixes_floating_out) ns_decls_floating_in = copy.copy(self._ns_decls_floating_in) curr_ns_map = copy.copy(self._curr_ns_map) ns_map_stack = copy.copy(self._ns_map_stack) pending_start_element = self._pending_start_element ns_counter = self._ns_counter # XXX: I have been unable to find a test justifying copying this :/ # for completeness, I’m still doing it ns_auto_prefixes_floating_in = \ copy.copy(self._ns_auto_prefixes_floating_in) try: yield except: # NOQA: E722 self._ns_prefixes_floating_in = ns_prefixes_floating_in self._ns_prefixes_floating_out = ns_prefixes_floating_out self._ns_decls_floating_in = ns_decls_floating_in self._pending_start_element = pending_start_element self._curr_ns_map = curr_ns_map self._ns_map_stack = ns_map_stack self._ns_counter = ns_counter self._ns_auto_prefixes_floating_in = ns_auto_prefixes_floating_in raise
[docs] @contextlib.contextmanager def buffer(self): """ Context manager to temporarily buffer the output. :raise RuntimeError: If two :meth:`buffer` context managers are used nestedly. If the context manager is left without exception, the buffered output is sent to the actual sink. Otherwise, it is discarded. In addition to the output being buffered, buffer also captures the entire state of the XML generator and restores it to the previous state if the context manager is left with an exception. This can be used to fail-safely attempt to serialise a subtree and return to a well-defined state if serialisation fails. :meth:`flush` is not called automatically. If :meth:`flush` is called while a :meth:`buffer` context manager is active, no actual flushing happens (but unfinished opening tags are closed as usual, see the `short_empty_arguments` parameter). """ if self._buf_in_use: raise RuntimeError("nested use of buffer() is not supported") self._buf_in_use = True old_write = self._write old_flush = self._flush if self._buf is None: self._buf = io.BytesIO() else: try: self._buf.seek(0) self._buf.truncate() except BufferError: # we need a fresh buffer for this, the other is still in use. self._buf = io.BytesIO() self._write = self._buf.write self._flush = None try: with self._save_state(): yield old_write(self._buf.getbuffer()) if old_flush: old_flush() finally: self._buf_in_use = False self._write = old_write self._flush = old_flush
[docs]class XMLStreamWriter: """ A convenient class to write a standard conforming XML stream. :param f: File-like object to write to. :param to: Address to which the connection is addressed. :type to: :class:`aioxmpp.JID` :param from_: Optional address from which the connection originates. :type from_: :class:`aioxmpp.JID` :param version: Version of the XML stream protocol. :type version: :class:`tuple` of (:class:`int`, :class:`int`) :param nsmap: Mapping of namespaces to declare at the stream header. .. note:: The constructor *does not* send a stream header. :meth:`start` must be called explicitly to send a stream header. The generated stream header follows :rfc:`6120` and has the ``to`` and ``version`` attributes as well as optionally the ``from`` attribute (controlled by `from_`). In addition, the namespace prefixes defined by `nsmap` (mapping prefixes to namespace URIs) are declared on the stream header. .. note:: It is unfortunately not allowed to use namespace prefixes in stanzas which were declared in stream headers as convenient as that would be. The option is thus only useful to declare the default namespace for stanzas. .. autoattribute:: closed The following methods are used to generate output: .. automethod:: start .. automethod:: send .. automethod:: abort .. automethod:: close """ def __init__(self, f, to, from_=None, version=(1, 0), nsmap={}, sorted_attributes=False): super().__init__() self._to = to self._from = from_ self._version = version self._writer = XMPPXMLGenerator( out=f, short_empty_elements=True, sorted_attributes=sorted_attributes) self._nsmap_to_use = { "stream": namespaces.xmlstream } self._nsmap_to_use.update(nsmap) self._closed = False @property def closed(self): """ True if the stream has been closed by :meth:`abort` or :meth:`close`. Read-only. """ return self._closed
[docs] def start(self): """ Send the stream header as described above. """ attrs = { (None, "to"): str(self._to), (None, "version"): ".".join(map(str, self._version)) } if self._from: attrs[None, "from"] = str(self._from) self._writer.startDocument() for prefix, uri in self._nsmap_to_use.items(): self._writer.startPrefixMapping(prefix, uri) self._writer.startElementNS( (namespaces.xmlstream, "stream"), None, attrs) self._writer.flush()
[docs] def send(self, xso): """ Send a single XML stream object. :param xso: Object to serialise and send. :type xso: :class:`aioxmpp.xso.XSO` :raises Exception: from any serialisation errors, usually :class:`ValueError`. Serialise the `xso` and send it over the stream. If any serialisation error occurs, no data is sent over the stream and the exception is re-raised; the :meth:`send` method thus provides strong exception safety. .. warning:: The behaviour of :meth:`send` after :meth:`abort` or :meth:`close` and before :meth:`start` is undefined. """ with self._writer.buffer(): xso.xso_serialise_to_sax(self._writer)
[docs] def abort(self): """ Abort the stream. The stream is flushed and the internal data structures are cleaned up. No stream footer is sent. The stream is :attr:`closed` afterwards. If the stream is already :attr:`closed`, this method does nothing. """ if self._closed: return self._closed = True self._writer.flush() del self._writer
[docs] def close(self): """ Close the stream. The stream footer is sent and the internal structures are cleaned up. If the stream is already :attr:`closed`, this method does nothing. """ if self._closed: return self._closed = True self._writer.endElementNS((namespaces.xmlstream, "stream"), None) for prefix in self._nsmap_to_use: self._writer.endPrefixMapping(prefix) self._writer.endDocument() del self._writer
class ProcessorState(Enum): CLEAN = 0 STARTED = 1 STREAM_HEADER_PROCESSED = 2 STREAM_FOOTER_PROCESSED = 3 EXCEPTION_BACKOFF = 4
[docs]class XMPPXMLProcessor: """ This class is a :class:`xml.sax.handler.ContentHandler`. It can be used to parse an XMPP XML stream. When used with a :class:`xml.sax.xmlreader.XMLReader`, it gradually processes the incoming XML stream. If any restricted XML is encountered, an appropriate :class:`~.errors.StreamError` is raised. .. warning:: To achieve compliance with XMPP, it is recommended to use :class:`XMPPLexicalHandler` as lexical handler, using :meth:`xml.sax.xmlreader.XMLReader.setProperty`:: parser.setProperty(xml.sax.handler.property_lexical_handler, XMPPLexicalHandler) Otherwise, invalid XMPP XML such as comments, entity references and DTD declarations will not be caught. **Exception handling**: When an exception occurs while parsing a stream-level element, such as a stanza, the exception is stored internally and exception handling is invoked. During exception handling, all SAX events are dropped, until the stream-level element has been completely processed by the parser. Then, if available, :attr:`on_exception` is called, with the stored exception as the only argument. If :attr:`on_exception` is false (e.g. :data:`None`), the exception is re-raised from the :meth:`endElementNS` handler, in turn most likely destroying the SAX parsers internal state. .. attribute:: on_exception May be a callable or :data:`None`. If not false, the value will get called when exception handling has finished, with the exception as the only argument. .. attribute:: on_stream_footer May be a callable or :data:`None`. If not false, the value will get called whenever a stream footer is processed. .. attribute:: on_stream_header May be a callable or :data:`None`. If not false, the value will get called whenever a stream header is processed. .. autoattribute:: stanza_parser """ def __init__(self): super().__init__() self._state = ProcessorState.CLEAN self._stanza_parser = None self._stored_exception = None self.on_stream_header = None self.on_stream_footer = None self.on_exception = None self.remote_version = None self.remote_from = None self.remote_to = None self.remote_id = None self.remote_lang = None @property def stanza_parser(self): """ A :class:`~.xso.XSOParser` object (or compatible) which will receive the sax-ish events used in :mod:`~aioxmpp.xso`. It is driven using an instance of :class:`~.xso.SAXDriver`. This object can only be set before :meth:`startDocument` has been called (or after :meth:`endDocument` has been called). """ return self._stanza_parser @stanza_parser.setter def stanza_parser(self, value): if self._state != ProcessorState.CLEAN: raise RuntimeError("invalid state: {}".format(self._state)) self._stanza_parser = value self._stanza_parser.lang = self.remote_lang def processingInstruction(self, target, foo): raise errors.StreamError( errors.StreamErrorCondition.RESTRICTED_XML, "processing instructions are not allowed in XMPP" ) def characters(self, characters): if self._state == ProcessorState.EXCEPTION_BACKOFF: pass elif self._state != ProcessorState.STREAM_HEADER_PROCESSED: raise RuntimeError("invalid state: {}".format(self._state)) else: self._driver.characters(characters) def startDocument(self): if self._state != ProcessorState.CLEAN: raise RuntimeError("invalid state: {}".format(self._state)) self._state = ProcessorState.STARTED self._depth = 0 self._driver = xso.SAXDriver(self._stanza_parser) def startElement(self, name, attributes): raise RuntimeError("incorrectly configured parser: " "startElement called (instead of startElementNS)") def endElement(self, name): raise RuntimeError("incorrectly configured parser: " "endElement called (instead of endElementNS)") def endDocument(self): if self._state != ProcessorState.STREAM_FOOTER_PROCESSED: raise RuntimeError("invalid state: {}".format(self._state)) self._state = ProcessorState.CLEAN self._driver = None def startPrefixMapping(self, prefix, uri): pass def endPrefixMapping(self, prefix): pass def startElementNS(self, name, qname, attributes): if self._state == ProcessorState.STREAM_HEADER_PROCESSED: try: self._driver.startElementNS(name, qname, attributes) except Exception as exc: self._stored_exception = exc self._state = ProcessorState.EXCEPTION_BACKOFF self._depth += 1 return elif self._state == ProcessorState.EXCEPTION_BACKOFF: self._depth += 1 return elif self._state != ProcessorState.STARTED: raise RuntimeError("invalid state: {}".format(self._state)) if name != (namespaces.xmlstream, "stream"): raise errors.StreamError( errors.StreamErrorCondition.INVALID_NAMESPACE, "stream has invalid namespace or localname" ) attributes = dict(attributes) try: self.remote_version = tuple( map(int, attributes.pop((None, "version"), "0.9").split(".")) ) except ValueError as exc: raise errors.StreamError( errors.StreamErrorCondition.UNSUPPORTED_VERSION, str(exc) ) remote_to = attributes.pop((None, "to"), None) if remote_to is not None: remote_to = structs.JID.fromstr(remote_to) self.remote_to = remote_to try: self.remote_from = structs.JID.fromstr( attributes.pop((None, "from")) ) except KeyError: raise errors.StreamError( errors.StreamErrorCondition.UNDEFINED_CONDITION, "from attribute required in response header" ) try: self.remote_id = attributes.pop((None, "id")) except KeyError: raise errors.StreamError( errors.StreamErrorCondition.UNDEFINED_CONDITION, "id attribute required in response header" ) try: lang = attributes.pop((namespaces.xml, "lang")) except KeyError: self.remote_lang = None else: self.remote_lang = structs.LanguageTag.fromstr(lang) if self._stanza_parser is not None: self._stanza_parser.lang = self.remote_lang if self.on_stream_header: self.on_stream_header() self._state = ProcessorState.STREAM_HEADER_PROCESSED self._depth += 1 def _end_element_exception_handling(self): self._state = ProcessorState.STREAM_HEADER_PROCESSED exc = self._stored_exception self._stored_exception = None if self.on_exception: self.on_exception(exc) else: raise exc def endElementNS(self, name, qname): if self._state == ProcessorState.STREAM_HEADER_PROCESSED: self._depth -= 1 if self._depth > 0: try: return self._driver.endElementNS(name, qname) except Exception as exc: self._stored_exception = exc self._state = ProcessorState.EXCEPTION_BACKOFF if self._depth == 1: self._end_element_exception_handling() else: if self.on_stream_footer: self.on_stream_footer() self._state = ProcessorState.STREAM_FOOTER_PROCESSED elif self._state == ProcessorState.EXCEPTION_BACKOFF: self._depth -= 1 if self._depth == 1: self._end_element_exception_handling() else: raise RuntimeError("invalid state: {}".format(self._state))
[docs]class XMPPLexicalHandler: """ A `lexical handler <http://www.saxproject.org/apidoc/org/xml/sax/ext/LexicalHandler.html>`_ which rejects certain contents which are invalid in an XMPP XML stream: * comments, * dtd declarations, * non-predefined entities. The class can be used as lexical handler directly; all methods are stateless and can be used both on the class and on objects of the class. """ PREDEFINED_ENTITIES = {"amp", "lt", "gt", "apos", "quot"} @classmethod def comment(cls, data): raise errors.StreamError( errors.StreamErrorCondition.RESTRICTED_XML, "comments are not allowed in XMPP" ) @classmethod def startDTD(cls, name, publicId, systemId): raise errors.StreamError( errors.StreamErrorCondition.RESTRICTED_XML, "DTD declarations are not allowed in XMPP" ) @classmethod def endDTD(cls): pass @classmethod def startCDATA(cls): pass @classmethod def endCDATA(cls): pass @classmethod def startEntity(cls, name): if name not in cls.PREDEFINED_ENTITIES: raise errors.StreamError( errors.StreamErrorCondition.RESTRICTED_XML, "non-predefined entities are not allowed in XMPP" ) @classmethod def endEntity(cls, name): pass
[docs]def make_parser(): """ Create a parser which is suitably configured for parsing an XMPP XML stream. It comes equipped with :class:`XMPPLexicalHandler`. """ p = xml.sax.make_parser() p.setFeature(xml.sax.handler.feature_namespaces, True) p.setFeature(xml.sax.handler.feature_external_ges, False) p.setProperty(xml.sax.handler.property_lexical_handler, XMPPLexicalHandler) return p
[docs]def serialize_single_xso(x): """ Serialize a single XSO `x` to a string. This is potentially very slow and should only be used for debugging purposes. It is generally more efficient to use a :class:`XMPPXMLGenerator` to stream elements. """ buf = io.BytesIO() gen = XMPPXMLGenerator(buf, short_empty_elements=True, sorted_attributes=True) x.xso_serialise_to_sax(gen) return buf.getvalue().decode("utf8")
[docs]def write_single_xso(x, dest): """ Write a single XSO `x` to a binary file-like object `dest`. """ gen = XMPPXMLGenerator(dest, short_empty_elements=True, sorted_attributes=True) x.xso_serialise_to_sax(gen)
[docs]def read_xso(src, xsomap): """ Read a single XSO from a binary file-like input `src` containing an XML document. `xsomap` must be a mapping which maps :class:`~.XSO` subclasses to callables. These will be registered at a newly created :class:`.xso.XSOParser` instance which will be used to parse the document in `src`. The `xsomap` is thus used to determine the class parsing the root element of the XML document. This can be used to support multiple versions. """ xso_parser = xso.XSOParser() for class_, cb in xsomap.items(): xso_parser.add_class(class_, cb) driver = xso.SAXDriver(xso_parser) parser = xml.sax.make_parser() parser.setFeature( xml.sax.handler.feature_namespaces, True) parser.setFeature( xml.sax.handler.feature_external_ges, False) parser.setContentHandler(driver) parser.parse(src)
[docs]def read_single_xso(src, type_): """ Read a single :class:`~.XSO` of the given `type_` from the binary file-like input `src` and return the instance. """ result = None def cb(instance): nonlocal result result = instance read_xso(src, {type_: cb}) return result